CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
List of all members | Public Member Functions | Static Public Member Functions | Private Member Functions | Private Attributes | Static Private Attributes
evf::FUEventProcessor Class Reference

#include <FUEventProcessor.h>

Inheritance diagram for evf::FUEventProcessor:

Public Member Functions

void actionPerformed (xdata::Event &e)
 
bool configuring (toolbox::task::WorkLoop *wl)
 
void css (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception)
 
void defaultWebPage (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception)
 
bool enabling (toolbox::task::WorkLoop *wl)
 
xoap::MessageReference fsmCallback (xoap::MessageReference msg) throw (xoap::exception::Exception)
 
 FUEventProcessor (xdaq::ApplicationStub *s)
 
void getSlavePids (xgi::Input *in, xgi::Output *out)
 
bool halting (toolbox::task::WorkLoop *wl)
 
void handleSignalSlave (int sig, siginfo_t *info, void *c)
 
void microState (xgi::Input *in, xgi::Output *out)
 
void moduleWeb (xgi::Input *in, xgi::Output *out)
 
void pathNames (xgi::Input *, xgi::Output *) throw (xgi::exception::Exception)
 
void procStat (xgi::Input *in, xgi::Output *out)
 
void scalersWeb (xgi::Input *, xgi::Output *) throw (xgi::exception::Exception)
 
void sendMessageOverMonitorQueue (MsgBuf &)
 
void serviceWeb (xgi::Input *in, xgi::Output *out)
 
void spotlightWebPage (xgi::Input *, xgi::Output *) throw (xgi::exception::Exception)
 
bool stopping (toolbox::task::WorkLoop *wl)
 
void subWeb (xgi::Input *in, xgi::Output *out)
 
void updater (xgi::Input *in, xgi::Output *out)
 
 XDAQ_INSTANTIATOR ()
 
virtual ~FUEventProcessor ()
 

Static Public Member Functions

static void forkProcessFromEDM_helper (void *addr)
 

Private Member Functions

void attachDqmToShm () throw (evf::Exception)
 
void detachDqmFromShm () throw (evf::Exception)
 
bool doEndRunInEDM ()
 
bool enableClassic ()
 
bool enableCommon ()
 
bool enableForkInEDM ()
 
bool enableMPEPSlave ()
 
void forkProcessesFromEDM ()
 
void localLog (std::string)
 
std::string logsAsString ()
 
void makeStaticInfo ()
 
bool receiving (toolbox::task::WorkLoop *wl)
 
bool receivingAndMonitor (toolbox::task::WorkLoop *wl)
 
bool restartForkInEDM (unsigned int slotId)
 
bool scalers (toolbox::task::WorkLoop *wl)
 
void setAttachDqmToShm () throw (evf::Exception)
 
bool sigmon (toolbox::task::WorkLoop *wl)
 
void startReceivingLoop ()
 
void startReceivingMonitorLoop ()
 
void startScalersWorkLoop () throw (evf::Exception)
 
void startSignalMonitorWorkLoop () throw (evf::Exception)
 
void startSummarizeWorkLoop () throw (evf::Exception)
 
void startSupervisorLoop ()
 
bool stopClassic ()
 
void stopSlavesAndAcknowledge ()
 
bool summarize (toolbox::task::WorkLoop *wl)
 
bool supervisor (toolbox::task::WorkLoop *wl)
 

Private Attributes

unsigned long long allProcStats_
 
int anonymousPipe_ [2]
 
xdata::InfoSpace * applicationInfoSpace_
 
toolbox::task::ActionSignature * asReceiveMsgAndExecute_
 
toolbox::task::ActionSignature * asReceiveMsgAndRead_
 
toolbox::task::ActionSignature * asScalers_
 
toolbox::task::ActionSignature * asSignalMonitor_
 
toolbox::task::ActionSignature * asSummarize_
 
toolbox::task::ActionSignature * asSupervisor_
 
xdata::Boolean autoRestartSlaves_
 
xdata::String class_
 
xdata::String configString_
 
std::string configuration_
 
CPUStatcpustat_
 
unsigned int crashesThisRun_
 
xdata::UnsignedInteger32 crashesToDump_
 
Css css_
 
xdata::Boolean datasetCounting_
 
bool edm_init_done_
 
xdata::Boolean epInitialized_
 
FWEPWrapper evtProcessor_
 
xdata::Boolean exitOnError_
 
xdata::UnsignedInteger32 forkInEDM_
 
moduleweb::ForkInfoObjforkInfoObj_
 
pthread_mutex_t forkObjLock_
 
evf::StateMachine fsm_
 
xdata::Boolean hasModuleWebRegistry_
 
xdata::Boolean hasPrescaleService_
 
xdata::Boolean hasServiceWebRegistry_
 
xdata::Boolean hasShMem_
 
xdata::Boolean iDieStatisticsGathering_
 
xdata::String iDieUrl_
 
unsigned long long idleProcStats_
 
xdata::UnsignedInteger32 instance_
 
xdata::Boolean isRunNumberSetter_
 
timeval lastCrashTime_
 
timeval lastProcReport_
 
Logger log_
 
std::vector< std::string > logRing_
 
unsigned int logRingIndex_
 
bool logWrap_
 
MsgBuf master_message_prg_
 
MsgBuf master_message_prr_
 
MsgBuf master_message_trr_
 
std::auto_ptr< edm::PresencemessageServicePresence_
 
xdata::InfoSpace * monitorInfoSpace_
 
xdata::InfoSpace * monitorLegendaInfoSpace_
 
ModuleWebRegistrymwrRef_
 
SubProcessmyProcess_
 
std::list< std::string > names_
 
xdata::Serializable * nbAccepted
 
unsigned int nbdead_
 
unsigned int nblive_
 
xdata::Serializable * nbProcessed
 
xdata::UnsignedInteger32 nbSubProcesses_
 
xdata::UnsignedInteger32 nbSubProcessesReporting_
 
unsigned int nbTotalDQM_
 
bool outprev_
 
xdata::Boolean outPut_
 
pthread_mutex_t pickup_lock_
 
RateStatratestat_
 
std::string reasonForFailedState_
 
bool receiving_
 
bool receivingM_
 
bool restart_in_progress_
 
bool rlimit_coresize_changed_
 
rlimit rlimit_coresize_default_
 
xdata::UnsignedInteger32 runNumber_
 
xdata::InfoSpace * scalersInfoSpace_
 
xdata::InfoSpace * scalersLegendaInfoSpace_
 
unsigned int scalersUpdates_
 
sem_t * sigmon_sem_
 
bool signalMonitorActive_
 
MsgBuf slave_message_monitoring_
 
MsgBuf slave_message_prr_
 
xdata::UnsignedInteger32 slaveRestartDelaySecs_
 
ShmOutputModuleRegistrysorRef_
 
std::string sourceId_
 
xdata::Vector< xdata::Integer > spMStates_
 
xdata::Vector< xdata::Integer > spmStates_
 
SquidNet squidnet_
 
xdata::Boolean squidPresent_
 
pthread_mutex_t start_lock_
 
pthread_mutex_t stop_lock_
 
std::vector< SubProcesssubs_
 
xdata::UnsignedInteger32 superSleepSec_
 
bool supervising_
 
std::string updaterStatic_
 
xdata::String url_
 
pid_t vp_
 
Vulturevulture_
 
toolbox::task::WorkLoop * wlReceiving_
 
toolbox::task::WorkLoop * wlReceivingMonitor_
 
toolbox::task::WorkLoop * wlScalers_
 
bool wlScalersActive_
 
toolbox::task::WorkLoop * wlSignalMonitor_
 
toolbox::task::WorkLoop * wlSummarize_
 
bool wlSummarizeActive_
 
toolbox::task::WorkLoop * wlSupervising_
 

Static Private Attributes

static const unsigned int logRingSize_ = 50
 

Detailed Description

Definition at line 64 of file FUEventProcessor.h.

Constructor & Destructor Documentation

FUEventProcessor::FUEventProcessor ( xdaq::ApplicationStub *  s)

Definition at line 87 of file FUEventProcessor.cc.

References anonymousPipe_, applicationInfoSpace_, autoRestartSlaves_, evf::SquidNet::check(), class_, configString_, gather_cfg::cout, crashesToDump_, css(), datasetCounting_, defaultWebPage(), epInitialized_, evtProcessor_, evf::StateMachine::findRcmsStateListener(), forkInEDM_, forkInfoObj_, forkObjLock_, evf::StateMachine::foundRcmsStateListener(), fsm_, evf::FUInstancePtr_, edm::PresenceFactory::get(), getSlavePids(), hasModuleWebRegistry_, hasPrescaleService_, hasServiceWebRegistry_, hasShMem_, iDieStatisticsGathering_, iDieUrl_, evf::StateMachine::initialize(), instance_, isRunNumberSetter_, edm::PresenceFactory::makePresence(), makeStaticInfo(), messageServicePresence_, microState(), moduleWeb(), monitorInfoSpace_, monitorLegendaInfoSpace_, names_, nbSubProcesses_, nbSubProcessesReporting_, NULL, outPut_, pathNames(), pickup_lock_, pipe::pipe(), procStat(), evf::FWEPWrapper::publishConfigAndMonitorItems(), evf::StateMachine::rcmsStateListener(), rlimit_coresize_default_, runNumber_, scalersInfoSpace_, scalersLegendaInfoSpace_, scalersWeb(), serviceWeb(), evf::FWEPWrapper::setAppCtxt(), evf::FWEPWrapper::setAppDesc(), ML::MLlog4cplus::setAppl(), evf::FWEPWrapper::setApplicationInfoSpace(), evf::FWEPWrapper::setMonitorInfoSpace(), evf::FWEPWrapper::setRcms(), evf::FWEPWrapper::setScalersInfoSpace(), sigmon_sem_, slaveRestartDelaySecs_, sourceId_, spMStates_, spmStates_, spotlightWebPage(), squidnet_, squidPresent_, start_lock_, startSupervisorLoop(), evf::StateMachine::stateName(), stop_lock_, subWeb(), superSleepSec_, updater(), url_, and vulture_.

88  : xdaq::Application(s)
89  , fsm_(this)
90  , log_(getApplicationLogger())
91  , evtProcessor_(log_, getApplicationDescriptor()->getInstance())
92  , runNumber_(0)
93  , epInitialized_(false)
94  , outPut_(true)
95  , autoRestartSlaves_(false)
97  , hasShMem_(true)
98  , hasPrescaleService_(true)
99  , hasModuleWebRegistry_(true)
100  , hasServiceWebRegistry_(true)
101  , isRunNumberSetter_(true)
102  , iDieStatisticsGathering_(false)
103  , outprev_(true)
104  , exitOnError_(true)
106  , squidnet_(3128,"http://localhost:8000/RELEASE-NOTES.txt")
109  , logWrap_(false)
110  , nbSubProcesses_(0)
112  , forkInEDM_(true)
113  , nblive_(0)
114  , nbdead_(0)
115  , nbTotalDQM_(0)
116  , wlReceiving_(0)
118  , receiving_(false)
121  , receivingM_(false)
122  , myProcess_(0)
123  , wlSupervising_(0)
124  , asSupervisor_(0)
125  , supervising_(false)
126  , wlSignalMonitor_(0)
127  , asSignalMonitor_(0)
128  , signalMonitorActive_(false)
129  , monitorInfoSpace_(0)
132  , nbProcessed(0)
133  , nbAccepted(0)
134  , scalersInfoSpace_(0)
136  , wlScalers_(0)
137  , asScalers_(0)
138  , wlScalersActive_(false)
139  , scalersUpdates_(0)
140  , wlSummarize_(0)
141  , asSummarize_(0)
142  , wlSummarizeActive_(false)
143  , superSleepSec_(1)
144  , iDieUrl_("none")
145  , vulture_(0)
146  , vp_(0)
147  , cpustat_(0)
148  , ratestat_(0)
149  , mwrRef_(nullptr)
150  , sorRef_(nullptr)
155  , edm_init_done_(true)
156  , crashesThisRun_(false)
157  , rlimit_coresize_changed_(false)
158  , crashesToDump_(2)
159  , sigmon_sem_(0)
160  , datasetCounting_(true)
161 {
162  using namespace utils;
163 
164  FUInstancePtr_=this;
165 
166  names_.push_back("nbProcessed" );
167  names_.push_back("nbAccepted" );
168  names_.push_back("epMacroStateInt");
169  names_.push_back("epMicroStateInt");
170  // create pipe for web communication
171  int retpipe = pipe(anonymousPipe_);
172  if(retpipe != 0)
173  LOG4CPLUS_ERROR(getApplicationLogger(),"Failed to create pipe");
174  // check squid presence
176  //pass application parameters to FWEPWrapper
177  evtProcessor_.setAppDesc(getApplicationDescriptor());
178  evtProcessor_.setAppCtxt(getApplicationContext());
179  // bind relevant callbacks to finite state machine
181 
182  //set sourceId_
183  url_ =
184  getApplicationDescriptor()->getContextDescriptor()->getURL()+"/"+
185  getApplicationDescriptor()->getURN();
186  class_ =getApplicationDescriptor()->getClassName();
187  instance_=getApplicationDescriptor()->getInstance();
188  sourceId_=class_.toString()+"_"+instance_.toString();
189  LOG4CPLUS_INFO(getApplicationLogger(),sourceId_ <<" constructor" );
190  LOG4CPLUS_INFO(getApplicationLogger(),"CMSSW_BASE:"<<getenv("CMSSW_BASE"));
191 
192  getApplicationDescriptor()->setAttribute("icon", "/evf/images/epicon.jpg");
193 
194  xdata::InfoSpace *ispace = getApplicationInfoSpace();
195  applicationInfoSpace_ = ispace;
196 
197  // default configuration
198  ispace->fireItemAvailable("parameterSet", &configString_ );
199  ispace->fireItemAvailable("epInitialized", &epInitialized_ );
200  ispace->fireItemAvailable("stateName", fsm_.stateName() );
201  ispace->fireItemAvailable("runNumber", &runNumber_ );
202  ispace->fireItemAvailable("outputEnabled", &outPut_ );
203 
204  ispace->fireItemAvailable("hasSharedMemory", &hasShMem_);
205  ispace->fireItemAvailable("hasPrescaleService", &hasPrescaleService_ );
206  ispace->fireItemAvailable("hasModuleWebRegistry", &hasModuleWebRegistry_ );
207  ispace->fireItemAvailable("hasServiceWebRegistry", &hasServiceWebRegistry_ );
208  ispace->fireItemAvailable("isRunNumberSetter", &isRunNumberSetter_ );
209  ispace->fireItemAvailable("iDieStatisticsGathering", &iDieStatisticsGathering_);
210  ispace->fireItemAvailable("rcmsStateListener", fsm_.rcmsStateListener() );
211  ispace->fireItemAvailable("foundRcmsStateListener",fsm_.foundRcmsStateListener());
212  ispace->fireItemAvailable("nbSubProcesses", &nbSubProcesses_ );
213  ispace->fireItemAvailable("nbSubProcessesReporting",&nbSubProcessesReporting_ );
214  ispace->fireItemAvailable("forkInEDM" ,&forkInEDM_ );
215  ispace->fireItemAvailable("superSleepSec", &superSleepSec_ );
216  ispace->fireItemAvailable("autoRestartSlaves", &autoRestartSlaves_ );
217  ispace->fireItemAvailable("slaveRestartDelaySecs",&slaveRestartDelaySecs_ );
218  ispace->fireItemAvailable("iDieUrl", &iDieUrl_ );
219  ispace->fireItemAvailable("crashesToDump" ,&crashesToDump_ );
220  ispace->fireItemAvailable("datasetCounting" ,&datasetCounting_ );
221 
222  // Add infospace listeners for exporting data values
223  getApplicationInfoSpace()->addItemChangedListener("parameterSet", this);
224  getApplicationInfoSpace()->addItemChangedListener("outputEnabled", this);
225 
226  // findRcmsStateListener
228 
229  // initialize monitoring infospace
230 
231  std::string monInfoSpaceName="evf-eventprocessor-status-monitor";
232  toolbox::net::URN urn = this->createQualifiedInfoSpace(monInfoSpaceName);
233  monitorInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
234 
235  std::string monLegendaInfoSpaceName="evf-eventprocessor-status-legenda";
236  urn = this->createQualifiedInfoSpace(monLegendaInfoSpaceName);
237  monitorLegendaInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
238 
239 
240  monitorInfoSpace_->fireItemAvailable("url", &url_ );
241  monitorInfoSpace_->fireItemAvailable("class", &class_ );
242  monitorInfoSpace_->fireItemAvailable("instance", &instance_ );
243  monitorInfoSpace_->fireItemAvailable("runNumber", &runNumber_ );
244  monitorInfoSpace_->fireItemAvailable("stateName", fsm_.stateName());
245 
246  monitorInfoSpace_->fireItemAvailable("squidPresent", &squidPresent_ );
247 
248  std::string scalersInfoSpaceName="evf-eventprocessor-scalers-monitor";
249  urn = this->createQualifiedInfoSpace(scalersInfoSpaceName);
250  scalersInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
251 
252  std::string scalersLegendaInfoSpaceName="evf-eventprocessor-scalers-legenda";
253  urn = this->createQualifiedInfoSpace(scalersLegendaInfoSpaceName);
254  scalersLegendaInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
255 
256 
257 
259  scalersInfoSpace_->fireItemAvailable("instance", &instance_);
260 
264 
265  //subprocess state vectors for MP
266  monitorInfoSpace_->fireItemAvailable("epMacroStateInt", &spMStates_);
267  monitorInfoSpace_->fireItemAvailable("epMicroStateInt", &spmStates_);
268 
269  // Bind web interface
270  xgi::bind(this, &FUEventProcessor::css, "styles.css");
271  xgi::bind(this, &FUEventProcessor::defaultWebPage, "Default" );
272  xgi::bind(this, &FUEventProcessor::spotlightWebPage, "Spotlight" );
273  xgi::bind(this, &FUEventProcessor::scalersWeb, "scalersWeb");
274  xgi::bind(this, &FUEventProcessor::pathNames, "pathNames" );
275  xgi::bind(this, &FUEventProcessor::subWeb, "SubWeb" );
276  xgi::bind(this, &FUEventProcessor::getSlavePids, "getSlavePids");
277  xgi::bind(this, &FUEventProcessor::moduleWeb, "moduleWeb" );
278  xgi::bind(this, &FUEventProcessor::serviceWeb, "serviceWeb");
279  xgi::bind(this, &FUEventProcessor::microState, "microState");
280  xgi::bind(this, &FUEventProcessor::updater, "updater" );
281  xgi::bind(this, &FUEventProcessor::procStat, "procStat" );
282 
283  // instantiate the plugin manager, not referenced here after!
284 
286 
287  //create Message Service thread and pass ownership to auto_ptr that we destroy before fork
288  try{
289  LOG4CPLUS_DEBUG(getApplicationLogger(),
290  "Trying to create message service presence ");
292  if(pf != 0) {
293  messageServicePresence_= pf->makePresence("MessageServicePresence");
294  }
295  else {
296  LOG4CPLUS_ERROR(getApplicationLogger(),
297  "Unable to create message service presence ");
298  }
299  }
300  catch(...) {
301  LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception");
302  }
304 
305  typedef std::set<xdaq::ApplicationDescriptor*> AppDescSet_t;
306  typedef AppDescSet_t::iterator AppDescIter_t;
307 
308  AppDescSet_t rcms=
309  getApplicationContext()->getDefaultZone()->
310  getApplicationDescriptors("RCMSStateListener");
311  if(rcms.size()==0)
312  {
313  LOG4CPLUS_WARN(getApplicationLogger(),
314  "MonitorReceiver not found, perhaphs it has not been defined ? Scalers updater wl will bail out!");
315  // localLog("-W- MonitorReceiver not found, perhaphs it has not been defined ? Scalers updater wl will bail out!");
316  }
317  else
318  {
319  AppDescIter_t it = rcms.begin();
320  evtProcessor_.setRcms(*it);
321  }
322  pthread_mutex_init(&start_lock_,0);
323  pthread_mutex_init(&stop_lock_,0);
324  pthread_mutex_init(&pickup_lock_,0);
325 
326  forkInfoObj_=nullptr;
327  pthread_mutex_init(&forkObjLock_,0);
328  makeStaticInfo();
330 
331  if(vulture_==0) vulture_ = new Vulture(true);
332 
334 
335  AppDescSet_t setOfiDie=
336  getApplicationContext()->getDefaultZone()->
337  getApplicationDescriptors("evf::iDie");
338 
339  for (AppDescIter_t it=setOfiDie.begin();it!=setOfiDie.end();++it)
340  if ((*it)->getInstance()==0) // there has to be only one instance of iDie
341  iDieUrl_ = (*it)->getContextDescriptor()->getURL() + "/" + (*it)->getURN();
342 
343  //save default core file size
344  getrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
345 
346  //prepare IPC semaphore for getting the workloop waked up on signal caught in slaves
347  #ifdef linux
348  if (sigmon_sem_==0) {
349  sigmon_sem_ = (sem_t*)mmap(NULL, sizeof(sem_t),
350  PROT_READ | PROT_WRITE,
351  MAP_ANONYMOUS | MAP_SHARED, 0, 0);
352  if (!sigmon_sem_) {
353  perror("mmap error\n");
354  std::cout << "mmap error"<<std::endl;
355  }
356  else
357  sem_init(sigmon_sem_,true,0);
358  }
359  #endif
360 }
xdata::Boolean hasModuleWebRegistry_
ShmOutputModuleRegistry * sorRef_
void spotlightWebPage(xgi::Input *, xgi::Output *)
bool check()
Definition: SquidNet.cc:21
xdata::Vector< xdata::Integer > spMStates_
xdata::InfoSpace * monitorInfoSpace_
void subWeb(xgi::Input *in, xgi::Output *out)
std::vector< std::string > logRing_
xdata::UnsignedInteger32 slaveRestartDelaySecs_
xdata::Boolean datasetCounting_
xdata::UnsignedInteger32 runNumber_
void updater(xgi::Input *in, xgi::Output *out)
toolbox::task::WorkLoop * wlScalers_
pthread_mutex_t start_lock_
#define MSQS_MESSAGE_TYPE_TRR
Definition: queue_defs.h:34
xdata::UnsignedInteger32 crashesToDump_
xdata::Boolean isRunNumberSetter_
void setAppCtxt(xdaq::ApplicationContext *ctx)
Definition: FWEPWrapper.h:91
xdata::UnsignedInteger32 forkInEDM_
xdata::Boolean hasServiceWebRegistry_
toolbox::task::WorkLoop * wlReceiving_
xdata::Boolean hasShMem_
def pipe
Definition: pipe.py:5
toolbox::task::ActionSignature * asReceiveMsgAndExecute_
#define NULL
Definition: scimark2.h:8
toolbox::task::ActionSignature * asSignalMonitor_
FUEventProcessor * FUInstancePtr_
xdata::InfoSpace * scalersLegendaInfoSpace_
toolbox::task::ActionSignature * asSupervisor_
xdata::Boolean autoRestartSlaves_
xdata::UnsignedInteger32 nbSubProcessesReporting_
unsigned int crashesThisRun_
void css(xgi::Input *in, xgi::Output *out)
#define MAX_MSG_SIZE
Definition: queue_defs.h:10
toolbox::task::ActionSignature * asReceiveMsgAndRead_
xdata::Boolean exitOnError_
xdata::InfoSpace * monitorLegendaInfoSpace_
void initialize(T *app)
Definition: StateMachine.h:84
void getSlavePids(xgi::Input *in, xgi::Output *out)
xdata::Serializable * nbProcessed
pthread_mutex_t forkObjLock_
#define MSQM_MESSAGE_TYPE_PRG
Definition: queue_defs.h:19
xdata::Bag< xdaq2rc::ClassnameAndInstance > * rcmsStateListener()
Definition: StateMachine.h:72
#define MSQS_MESSAGE_TYPE_PRR
Definition: queue_defs.h:32
evf::StateMachine fsm_
toolbox::task::WorkLoop * wlSupervising_
static void setAppl(xdaq::Application *app)
Definition: MLlog4cplus.cc:85
std::string reasonForFailedState_
xdata::Boolean iDieStatisticsGathering_
void setAppDesc(xdaq::ApplicationDescriptor *ad)
Definition: FWEPWrapper.h:90
void scalersWeb(xgi::Input *, xgi::Output *)
xdata::Boolean squidPresent_
std::list< std::string > names_
void setScalersInfoSpace(xdata::InfoSpace *sis, xdata::InfoSpace *slis)
Definition: FWEPWrapper.h:77
xdata::Boolean hasPrescaleService_
void procStat(xgi::Input *in, xgi::Output *out)
moduleweb::ForkInfoObj * forkInfoObj_
toolbox::task::WorkLoop * wlSummarize_
xdata::Serializable * nbAccepted
static PresenceFactory * get()
void setMonitorInfoSpace(xdata::InfoSpace *mis, xdata::InfoSpace *mlis)
Definition: FWEPWrapper.h:83
ModuleWebRegistry * mwrRef_
xdata::InfoSpace * applicationInfoSpace_
xdata::UnsignedInteger32 nbSubProcesses_
std::auto_ptr< Presence > makePresence(std::string const &presence_type) const
xdata::String * stateName()
Definition: StateMachine.h:69
void setApplicationInfoSpace(xdata::InfoSpace *is)
Definition: FWEPWrapper.h:82
xdata::String configString_
unsigned int scalersUpdates_
xdata::Boolean * foundRcmsStateListener()
Definition: StateMachine.h:78
xdata::UnsignedInteger32 superSleepSec_
std::auto_ptr< edm::Presence > messageServicePresence_
void moduleWeb(xgi::Input *in, xgi::Output *out)
void pathNames(xgi::Input *, xgi::Output *)
pthread_mutex_t pickup_lock_
void defaultWebPage(xgi::Input *in, xgi::Output *out)
toolbox::task::ActionSignature * asSummarize_
toolbox::task::WorkLoop * wlSignalMonitor_
xdata::Vector< xdata::Integer > spmStates_
tuple cout
Definition: gather_cfg.py:121
xdata::UnsignedInteger32 instance_
void serviceWeb(xgi::Input *in, xgi::Output *out)
toolbox::task::WorkLoop * wlReceivingMonitor_
toolbox::task::ActionSignature * asScalers_
pthread_mutex_t stop_lock_
void publishConfigAndMonitorItems(bool)
Definition: FWEPWrapper.cc:112
xdata::InfoSpace * scalersInfoSpace_
void microState(xgi::Input *in, xgi::Output *out)
void setRcms(xdaq::ApplicationDescriptor *rcms)
Definition: FWEPWrapper.h:89
static const unsigned int logRingSize_
xdata::Boolean epInitialized_
void findRcmsStateListener()
FUEventProcessor::~FUEventProcessor ( )
virtual

Definition at line 365 of file FUEventProcessor.cc.

References vulture_.

366 {
367  // no longer needed since the wrapper is a member of the class and one can rely on
368  // implicit destructor - to be revised - at any rate the most common exit path is via "kill"...
369  // if (evtProcessor_) delete evtProcessor_;
370  if(vulture_ != 0) delete vulture_;
371 }

Member Function Documentation

void FUEventProcessor::actionPerformed ( xdata::Event &  e)

Definition at line 810 of file FUEventProcessor.cc.

References alignCSCRings::e, edm::EventProcessor::enableEndPaths(), epInitialized_, evtProcessor_, fsm_, outprev_, outPut_, and evf::StateMachine::stateName().

811 {
812 
813  if (e.type()=="ItemChangedEvent" && fsm_.stateName()->toString()!="Halted") {
814  std::string item = dynamic_cast<xdata::ItemChangedEvent&>(e).itemName();
815 
816  if ( item == "parameterSet") {
817  LOG4CPLUS_WARN(getApplicationLogger(),
818  "HLT Menu changed, force reinitialization of EventProcessor");
819  epInitialized_ = false;
820  }
821  if ( item == "outputEnabled") {
822  if(outprev_ != outPut_) {
823  LOG4CPLUS_WARN(getApplicationLogger(),
824  (outprev_ ? "Disabling " : "Enabling ")<<"global output");
826  outprev_ = outPut_;
827  }
828  }
829  if (item == "globalInputPrescale") {
830  LOG4CPLUS_WARN(getApplicationLogger(),
831  "Setting global input prescale has no effect "
832  <<"in this version of the code");
833  }
834  if ( item == "globalOutputPrescale") {
835  LOG4CPLUS_WARN(getApplicationLogger(),
836  "Setting global output prescale has no effect "
837  <<"in this version of the code");
838  }
839  }
840 
841 }
evf::StateMachine fsm_
xdata::String * stateName()
Definition: StateMachine.h:69
xdata::Boolean epInitialized_
void enableEndPaths(bool active)
void FUEventProcessor::attachDqmToShm ( )
throw (evf::Exception
)
private

Definition at line 1064 of file FUEventProcessor.cc.

References alignCSCRings::e, evtProcessor_, edm::EventProcessor::getToken(), edm::Service< T >::isAvailable(), cmsPerfStripChart::operate(), summarizeEdmComparisonLogfiles::success, and cms::Exception::what().

Referenced by enableCommon(), and forkProcessesFromEDM().

1065 {
1066  std::string errmsg;
1067  bool success = false;
1068  try {
1071  success = edm::Service<FUShmDQMOutputService>()->attachToShm();
1072  if (!success) errmsg = "Failed to attach DQM service to shared memory";
1073  }
1074  catch (cms::Exception& e) {
1075  errmsg = "Failed to attach DQM service to shared memory: " + (std::string)e.what();
1076  }
1077  if (!errmsg.empty()) XCEPT_RAISE(evf::Exception,errmsg);
1078 }
virtual char const * what() const
Definition: Exception.cc:141
bool isAvailable() const
Definition: Service.h:47
ServiceToken getToken()
bool FUEventProcessor::configuring ( toolbox::task::WorkLoop *  wl)

Definition at line 380 of file FUEventProcessor.cc.

References edm::EventProcessor::beginJob(), configString_, evf::FWEPWrapper::configuration(), configuration_, cpustat_, datasetCounting_, alignCSCRings::e, epInitialized_, evtProcessor_, cppFunctionSkipper::exception, edm::hlt::Exception, cms::Exception::explainSelf(), evf::StateMachine::fireEvent(), evf::StateMachine::fireFailed(), fsm_, evf::ShmOutputModuleRegistry::getDatasetCSV(), evf::FWEPWrapper::getmicromap(), evf::FWEPWrapper::getModuleWebRegistry(), evf::FWEPWrapper::getNumberOfMicrostates(), evf::FWEPWrapper::getShmOutputModuleRegistry(), hasModuleWebRegistry_, hasPrescaleService_, hasServiceWebRegistry_, i, iDieStatisticsGathering_, iDieUrl_, evf::FWEPWrapper::init(), instance_, localLog(), evf::Vulture::makeProcess(), mwrRef_, nbSubProcesses_, ratestat_, reasonForFailedState_, scalersLegendaInfoSpace_, evf::RateStat::sendAuxLegenda(), evf::RateStat::sendLegenda(), evf::CPUStat::sendLegenda(), evf::FWEPWrapper::setupFastTimerService(), edm::event_processor::sInit, sorRef_, spMStates_, spmStates_, evf::FWEPWrapper::startMonitoringWorkLoop(), vp_, and vulture_.

381 {
382 // std::cout << "values " << ((nbSubProcesses_.value_!=0) ? 0x10 : 0) << " "
383 // << ((instance_.value_==0) ? 0x8 : 0) << " "
384 // << (hasServiceWebRegistry_.value_ ? 0x4 : 0) << " "
385 // << (hasModuleWebRegistry_.value_ ? 0x2 : 0) << " "
386 // << (hasPrescaleService_.value_ ? 0x1 : 0) <<std::endl;
387  unsigned short smap
388  = (datasetCounting_.value_ ? 0x20 : 0 )
389  + ((nbSubProcesses_.value_!=0) ? 0x10 : 0)
390  + (((instance_.value_%80)==0) ? 0x8 : 0) // have at least one legend per slice
391  + (hasServiceWebRegistry_.value_ ? 0x4 : 0)
392  + (hasModuleWebRegistry_.value_ ? 0x2 : 0)
393  + (hasPrescaleService_.value_ ? 0x1 : 0);
394  if(nbSubProcesses_.value_==0)
395  {
396  spMStates_.setSize(1);
397  spmStates_.setSize(1);
398  }
399  else
400  {
401  spMStates_.setSize(nbSubProcesses_.value_);
402  spmStates_.setSize(nbSubProcesses_.value_);
403  for(unsigned int i = 0; i < spMStates_.size(); i++)
404  {
406  spmStates_[i] = 0;
407  }
408  }
409  try {
410  LOG4CPLUS_INFO(getApplicationLogger(),"Start configuring ...");
411  std::string cfg = configString_.toString(); evtProcessor_.init(smap,cfg);
412  epInitialized_=true;
413  if(evtProcessor_)
414  {
415  //get ref of mwr
418  // moved to wrapper class
423  if(cpustat_) {delete cpustat_; cpustat_=0;}
425  nbSubProcesses_.value_,
426  instance_.value_,
427  iDieUrl_.value_);
428  if(ratestat_) {delete ratestat_; ratestat_=0;}
429  ratestat_ = new RateStat(iDieUrl_.value_);
430  if(iDieStatisticsGathering_.value_)
431  {
432  try
433  {
435  xdata::Serializable *legenda = scalersLegendaInfoSpace_->find("scalersLegenda");
436  if(legenda !=0)
437  {
438  std::string slegenda = ((xdata::String*)legenda)->value_;
439  ratestat_->sendLegenda(slegenda);
440  }
441  if (sorRef_ && datasetCounting_.value_)
442  {
443  xdata::String dsLegenda = sorRef_->getDatasetCSV();
444  if (dsLegenda.value_.size())
445  ratestat_->sendAuxLegenda(dsLegenda);
446  }
447  }
448  catch(evf::Exception &e)
449  {
450  LOG4CPLUS_INFO(getApplicationLogger(),"coud not send legenda"
451  << e.what());
452  }
453  catch (xcept::Exception& e) {
454  LOG4CPLUS_ERROR(getApplicationLogger(),"Failed to get or send legenda."
455  << e.what());
456  }
457  }
458 
459  fsm_.fireEvent("ConfigureDone",this);
460  LOG4CPLUS_INFO(getApplicationLogger(),"Finished configuring!");
461  localLog("-I- Configuration completed");
462 
463  }
464  }
465  catch (xcept::Exception &e) {
466  reasonForFailedState_ = "configuring FAILED: " + (std::string)e.what();
469  }
470  catch(cms::Exception &e) {
474  }
475  catch(std::exception &e) {
476  reasonForFailedState_ = e.what();
479  }
480  catch(...) {
481  fsm_.fireFailed("Unknown Exception",this);
482  }
483 
484  if(vulture_!=0 && vp_ == 0) vp_ = vulture_->makeProcess();
485 
486  return false;
487 }
xdata::Boolean hasModuleWebRegistry_
ShmOutputModuleRegistry * sorRef_
xdata::Vector< xdata::Integer > spMStates_
int i
Definition: DBlmapReader.cc:9
pid_t makeProcess()
Definition: Vulture.cc:154
xdata::Boolean datasetCounting_
void startMonitoringWorkLoop()
Definition: FWEPWrapper.cc:614
virtual std::string explainSelf() const
Definition: Exception.cc:146
ShmOutputModuleRegistry * getShmOutputModuleRegistry()
Definition: FWEPWrapper.cc:491
void sendAuxLegenda(const std::string &)
Definition: RateStat.cc:29
xdata::Boolean hasServiceWebRegistry_
void init(unsigned short, std::string &)
Definition: FWEPWrapper.cc:193
void setupFastTimerService(unsigned int nProcesses)
Definition: FWEPWrapper.cc:497
xdata::InfoSpace * scalersLegendaInfoSpace_
void fireFailed(const std::string &errorMsg, void *originator)
evf::StateMachine fsm_
ModuleWebRegistry * getModuleWebRegistry()
Definition: FWEPWrapper.cc:485
std::string reasonForFailedState_
xdata::Boolean iDieStatisticsGathering_
xdata::Boolean hasPrescaleService_
unsigned int getNumberOfMicrostates()
Definition: FWEPWrapper.h:140
void fireEvent(const std::string &evtType, void *originator)
void localLog(std::string)
ModuleWebRegistry * mwrRef_
xdata::UnsignedInteger32 nbSubProcesses_
xdata::String configString_
std::vector< std::string > const & getmicromap() const
Definition: FWEPWrapper.h:141
std::string const & configuration() const
Definition: FWEPWrapper.h:102
void sendLegenda(const std::vector< std::string > &)
Definition: CPUStat.cc:39
xdata::Vector< xdata::Integer > spmStates_
void sendLegenda(const std::string &)
Definition: RateStat.cc:24
xdata::UnsignedInteger32 instance_
xdata::Boolean epInitialized_
void evf::FUEventProcessor::css ( xgi::Input in,
xgi::Output out 
)
throw (xgi::exception::Exception
)
inline

Definition at line 107 of file FUEventProcessor.h.

References evf::Css::css(), and css_.

Referenced by FUEventProcessor().

108  {
109  css_.css(in,out);
110  }
void css(xgi::Input *in, xgi::Output *out)
Definition: Css.h:15
tuple out
Definition: dbtoconf.py:99
void FUEventProcessor::defaultWebPage ( xgi::Input in,
xgi::Output out 
)
throw (xgi::exception::Exception
)

Definition at line 921 of file FUEventProcessor.cc.

References nbSubProcesses_, and dbtoconf::out.

Referenced by FUEventProcessor(), and receivingAndMonitor().

923 {
924 
925 
926  *out << "<!DOCTYPE HTML PUBLIC \"-//W3C//DTD HTML 4.0 Transitional//EN\">"
927  << "<html><head><title>" << getApplicationDescriptor()->getClassName() << (nbSubProcesses_.value_ > 0 ? "MP " : " ")
928  << getApplicationDescriptor()->getInstance() << "</title>"
929  << "<meta http-equiv=\"REFRESH\" content=\"0;url=/evf/html/defaultBasePage.html\">"
930  << "</head></html>";
931 }
tuple out
Definition: dbtoconf.py:99
xdata::UnsignedInteger32 nbSubProcesses_
void FUEventProcessor::detachDqmFromShm ( )
throw (evf::Exception
)
private

Definition at line 1082 of file FUEventProcessor.cc.

References alignCSCRings::e, evtProcessor_, edm::EventProcessor::getToken(), edm::Service< T >::isAvailable(), cmsPerfStripChart::operate(), summarizeEdmComparisonLogfiles::success, and cms::Exception::what().

Referenced by stopClassic().

1083 {
1084  std::string errmsg;
1085  bool success = false;
1086  try {
1089  success = edm::Service<FUShmDQMOutputService>()->detachFromShm();
1090  if (!success) errmsg = "Failed to detach DQM service from shared memory";
1091  }
1092  catch (cms::Exception& e) {
1093  errmsg = "Failed to detach DQM service from shared memory: " + (std::string)e.what();
1094  }
1095  if (!errmsg.empty()) XCEPT_RAISE(evf::Exception,errmsg);
1096 }
virtual char const * what() const
Definition: Exception.cc:141
bool isAvailable() const
Definition: Service.h:47
ServiceToken getToken()
bool FUEventProcessor::doEndRunInEDM ( )
private

Definition at line 670 of file FUEventProcessor.cc.

References evf::moduleweb::ForkInfoObj::control_sem_, prof2calltree::count, edm_init_done_, evtProcessor_, evf::StateMachine::fireFailed(), forkInfoObj_, fsm_, edm::EventProcessor::getState(), evf::moduleweb::ForkInfoObj::lock(), log_, evf::moduleweb::ForkInfoObj::receivedStop_, edm::event_processor::sJobReady, stor::utils::sleep(), edm::event_processor::sRunning, edm::event_processor::sStopping, edm::EventProcessor::stateName(), evf::moduleweb::ForkInfoObj::stopCondition, and evf::moduleweb::ForkInfoObj::unlock().

Referenced by halting(), and stopping().

670  {
671  //taking care that EP in master is in stoppable state
672  if (forkInfoObj_) {
673 
674  int count = 30;
675  bool waitedForEDM=false;
676  while (!edm_init_done_ && count) {
677  ::sleep(1);
678  if (count%5==0)
679  LOG4CPLUS_WARN(log_,"MASTER EP: Stopping while EP busy in beginRun. waiting " <<count<< "sec");
680  count--;
681  waitedForEDM=true;
682  }
683  //sleep a few more seconds it was early stop
684  if (waitedForEDM) sleep(5);
685 
686  //if (count==0) fsm_.fireFailed("failed to stop Master EP",this);
687 
689  return true;//we are already done
690 
691  forkInfoObj_->lock();
693  sem_post(forkInfoObj_->control_sem_);
694  forkInfoObj_->unlock();
695 
696  count = 30;
697 
699  while(count--) {
700  st = evtProcessor_->getState();
701  if (st==edm::event_processor::sRunning) ::usleep(100000);
703  break;
704  }
705  else {
706  std::ostringstream ost;
707  ost << "Master edm::EventProcessor is in state "<< evtProcessor_->stateName(st) << " while stopping";
708  LOG4CPLUS_ERROR(getApplicationLogger(),ost.str());
709  fsm_.fireFailed(ost.str(),this);
710  return false;
711  }
712  if (count%5==0 && st==edm::event_processor::sRunning && !forkInfoObj_->receivedStop_) {
713  forkInfoObj_->lock();
715  sem_post(forkInfoObj_->control_sem_);
716  forkInfoObj_->unlock();
717  LOG4CPLUS_WARN(getApplicationLogger(),
718  "Master edm::EventProcessor still running after "<< (30-count-1) << " seconds. \"sem_post\" was executed again" );
719  }
720  }
721  if (count<0) {
722  std::ostringstream ost;
724  ost << "Timeout waiting for Master edm::EventProcessor to go stopping state "
725  << evtProcessor_->stateName(st) << ": input source did not receive stop signal!";
726  else
727  ost << "Timeout waiting for Master edm::EventProcessor to go stopping state "<<evtProcessor_->stateName(st);
728  LOG4CPLUS_ERROR(getApplicationLogger(),ost.str());
729  fsm_.fireFailed(ost.str(),this);
730  return false;
731  }
732  }
733  return true;
734 }
event_processor::State getState() const
void sleep(Duration_t)
Definition: Utils.h:163
void fireFailed(const std::string &errorMsg, void *originator)
evf::StateMachine fsm_
unsigned int stopCondition
Definition: ModuleWeb.h:46
moduleweb::ForkInfoObj * forkInfoObj_
char const * stateName(event_processor::State s) const
bool FUEventProcessor::enableClassic ( )
private

Definition at line 2319 of file FUEventProcessor.cc.

References enableCommon(), evtProcessor_, edm::EventProcessor::getState(), localLog(), stor::utils::sleep(), and edm::event_processor::sRunning.

Referenced by enabling().

2320 {
2321  bool retval = enableCommon();
2323  LOG4CPLUS_INFO(getApplicationLogger(),"waiting for edm::EventProcessor to start before enabling watchdog");
2324  ::sleep(1);
2325  }
2326 
2327  // implementation moved to EPWrapper
2328  // startScalersWorkLoop(); // this is now not done any longer
2329  localLog("-I- Start completed");
2330  return retval;
2331 }
event_processor::State getState() const
void sleep(Duration_t)
Definition: Utils.h:163
void localLog(std::string)
bool FUEventProcessor::enableCommon ( )
private

Definition at line 1954 of file FUEventProcessor.cc.

References attachDqmToShm(), edm::EventProcessor::clearCounters(), gather_cfg::cout, edm::EventProcessor::declareRunNumber(), alignCSCRings::e, evtProcessor_, cppFunctionSkipper::exception, edm::hlt::Exception, cms::Exception::explainSelf(), evf::StateMachine::fireEvent(), evf::StateMachine::fireFailed(), fsm_, hasShMem_, isRunNumberSetter_, localLog(), reasonForFailedState_, edm::EventProcessor::runAsync(), runNumber_, edm::EventProcessor::setRunNumber(), stor::utils::sleep(), and edm::EventProcessor::statusAsync().

Referenced by enableClassic(), and enableMPEPSlave().

1955 {
1956  try {
1957  if(hasShMem_) attachDqmToShm();
1958  int sc = 0;
1960  if(isRunNumberSetter_)
1962  else
1964 
1965  try{
1966  ::sleep(1);
1968  sc = evtProcessor_->statusAsync();
1969  }
1970  catch(cms::Exception &e) {
1974  return false;
1975  }
1976  catch(std::exception &e) {
1977  reasonForFailedState_ = e.what();
1980  return false;
1981  }
1982  catch(...) {
1983  reasonForFailedState_ = "Unknown Exception";
1986  return false;
1987  }
1988  if(sc != 0) {
1989  std::ostringstream oss;
1990  oss<<"EventProcessor::runAsync returned status code " << sc;
1991  reasonForFailedState_ = oss.str();
1994  return false;
1995  }
1996  }
1997  catch (xcept::Exception &e) {
1998  reasonForFailedState_ = "enabling FAILED: " + (std::string)e.what();
2001  return false;
2002  }
2003  try{
2004  fsm_.fireEvent("EnableDone",this);
2005  }
2006  catch (xcept::Exception &e) {
2007  std::cout << "exception " << (std::string)e.what() << std::endl;
2008  throw;
2009  }
2010 
2011  return false;
2012 }
xdata::UnsignedInteger32 runNumber_
virtual std::string explainSelf() const
Definition: Exception.cc:146
xdata::Boolean isRunNumberSetter_
void clearCounters()
Clears counters used by trigger report.
xdata::Boolean hasShMem_
void setRunNumber(RunNumber_t runNumber)
void sleep(Duration_t)
Definition: Utils.h:163
void fireFailed(const std::string &errorMsg, void *originator)
evf::StateMachine fsm_
void declareRunNumber(RunNumber_t runNumber)
std::string reasonForFailedState_
void fireEvent(const std::string &evtType, void *originator)
void localLog(std::string)
StatusCode statusAsync() const
tuple cout
Definition: gather_cfg.py:121
bool FUEventProcessor::enableForkInEDM ( )
private

Definition at line 2236 of file FUEventProcessor.cc.

References edm::EventProcessor::clearCounters(), edm::EventProcessor::declareRunNumber(), alignCSCRings::e, evtProcessor_, cppFunctionSkipper::exception, cms::Exception::explainSelf(), evf::StateMachine::fireFailed(), evf::moduleweb::ForkInfoObj::forkHandler, forkInfoObj_, forkObjLock_, evf::moduleweb::ForkInfoObj::forkParams, forkProcessFromEDM_helper(), fsm_, evf::moduleweb::ForkInfoObj::fuAddr, evf::moduleweb::ForkParams::isMaster, isRunNumberSetter_, log_, evf::moduleweb::ForkInfoObj::mst_lock_, mwrRef_, evf::ModuleWebRegistry::publishForkInfo(), reasonForFailedState_, evf::FWEPWrapper::resetWaiting(), evf::moduleweb::ForkParams::restart, edm::EventProcessor::runAsync(), runNumber_, edm::EventProcessor::setRunNumber(), evf::moduleweb::ForkParams::slotId, edm::EventProcessor::statusAsync(), and evf::moduleweb::ForkInfoObj::stopCondition.

Referenced by enabling().

2237 {
2239  try {
2240  //set to connect to Shm later
2241  //if(hasShMem_) setAttachDqmToShm();
2242 
2243  int sc = 0;
2244  //maybe not needed in MP mode
2246  if(isRunNumberSetter_)
2248  else
2250 
2251  //prepare object used to communicate with DaqSource
2252  pthread_mutex_destroy(&forkObjLock_);
2253  pthread_mutex_init(&forkObjLock_,0);
2254  if (forkInfoObj_) delete forkInfoObj_;
2257  forkInfoObj_->fuAddr=(void*)this;
2263  if (mwrRef_)
2264  mwrRef_->publishForkInfo(std::string("DaqSource"),forkInfoObj_);
2265 
2267  sc = evtProcessor_->statusAsync();
2268 
2269  if(sc != 0) {
2270  std::ostringstream oss;
2271  oss<<"EventProcessor::runAsync returned status code " << sc;
2272  reasonForFailedState_ = oss.str();
2274  LOG4CPLUS_FATAL(log_,reasonForFailedState_);
2275  return false;
2276  }
2277  }
2278  //catch exceptions on master side
2279  catch(cms::Exception &e) {
2282  LOG4CPLUS_FATAL(log_,reasonForFailedState_);
2283  return false;
2284  }
2285  catch(std::exception &e) {
2286  reasonForFailedState_ = e.what();
2288  LOG4CPLUS_FATAL(log_,reasonForFailedState_);
2289  return false;
2290  }
2291  catch(...) {
2292  reasonForFailedState_ = "Unknown Exception";
2294  LOG4CPLUS_FATAL(log_,reasonForFailedState_);
2295  return false;
2296  }
2297  return true;
2298 }
xdata::UnsignedInteger32 runNumber_
virtual std::string explainSelf() const
Definition: Exception.cc:146
void publishForkInfo(std::string name, moduleweb::ForkInfoObj *forkInfoObj)
xdata::Boolean isRunNumberSetter_
void clearCounters()
Clears counters used by trigger report.
void(* forkHandler)(void *)
Definition: ModuleWeb.h:44
void setRunNumber(RunNumber_t runNumber)
pthread_mutex_t forkObjLock_
void fireFailed(const std::string &errorMsg, void *originator)
evf::StateMachine fsm_
unsigned int stopCondition
Definition: ModuleWeb.h:46
void declareRunNumber(RunNumber_t runNumber)
std::string reasonForFailedState_
moduleweb::ForkInfoObj * forkInfoObj_
static void forkProcessFromEDM_helper(void *addr)
ModuleWebRegistry * mwrRef_
StatusCode statusAsync() const
pthread_mutex_t * mst_lock_
Definition: ModuleWeb.h:49
bool FUEventProcessor::enableMPEPSlave ( )
private

Definition at line 2332 of file FUEventProcessor.cc.

References alignCSCRings::e, enableCommon(), evtProcessor_, edm::hlt::Exception, evf::StateMachine::fireFailed(), fsm_, edm::PresenceFactory::get(), evf::FWEPWrapper::isWaitingForLs(), localLog(), edm::PresenceFactory::makePresence(), reasonForFailedState_, evf::FWEPWrapper::resetWaiting(), ML::MLlog4cplus::setAppl(), stor::utils::sleep(), startReceivingLoop(), startReceivingMonitorLoop(), and startScalersWorkLoop().

Referenced by enabling(), and supervisor().

2333 {
2334  //all this happens only in the child process
2335 
2340  while(!evtProcessor_.isWaitingForLs())
2341  ::sleep(1);
2342 
2343  // @EM test do not run monitor loop in slave, only receiving&Monitor
2344  // evtProcessor_.startMonitoringWorkLoop();
2345  try{
2346  // evtProcessor_.makeServicesOnly();
2347  try{
2349  if(pf != 0) {
2350  pf->makePresence("MessageServicePresence").release();
2351  }
2352  else {
2353  LOG4CPLUS_ERROR(getApplicationLogger(),
2354  "Unable to create message service presence ");
2355  }
2356  }
2357  catch(...) {
2358  LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception");
2359  }
2361  }
2362  catch (xcept::Exception &e) {
2363  reasonForFailedState_ = "enabling FAILED: " + (std::string)e.what();
2366  }
2367  bool retval = enableCommon();
2368  // while(evtProcessor_->getState()!= edm::event_processor::sRunning){
2369  // LOG4CPLUS_INFO(getApplicationLogger(),"waiting for edm::EventProcessor to start before enabling watchdog");
2370  // ::sleep(1);
2371  // }
2372  return retval;
2373 }
void sleep(Duration_t)
Definition: Utils.h:163
void fireFailed(const std::string &errorMsg, void *originator)
evf::StateMachine fsm_
static void setAppl(xdaq::Application *app)
Definition: MLlog4cplus.cc:85
std::string reasonForFailedState_
void localLog(std::string)
static PresenceFactory * get()
std::auto_ptr< Presence > makePresence(std::string const &presence_type) const
bool isWaitingForLs()
Definition: FWEPWrapper.h:135
bool FUEventProcessor::enabling ( toolbox::task::WorkLoop *  wl)

Definition at line 493 of file FUEventProcessor.cc.

References toolbox::mem::_s_mutex_ptr_, allProcStats_, edm::EventProcessor::beginJob(), configString_, evf::FWEPWrapper::configuration(), configuration_, cpustat_, crashesThisRun_, datasetCounting_, evf::StateMachine::disableRcmsStateNotification(), alignCSCRings::e, edm_init_done_, enableClassic(), enableForkInEDM(), enableMPEPSlave(), epInitialized_, evtProcessor_, edm::hlt::Exception, evf::StateMachine::fireEvent(), evf::StateMachine::fireFailed(), evf::FWEPWrapper::forceInitEventProcessorMaybe(), forkInEDM_, fsm_, evf::ShmOutputModuleRegistry::getDatasetCSV(), evf::FWEPWrapper::getmicromap(), evf::FWEPWrapper::getModuleWebRegistry(), evf::FWEPWrapper::getNumberOfMicrostates(), evf::FWEPWrapper::getShmOutputModuleRegistry(), evf::ShmOutputModuleRegistry::getShmOutputModules(), edm::EventProcessor::getState(), hasModuleWebRegistry_, hasPrescaleService_, hasServiceWebRegistry_, i, iDieStatisticsGathering_, iDieUrl_, idleProcStats_, evf::FWEPWrapper::init(), instance_, localLog(), mwrRef_, myProcess_, nbSubProcesses_, nbTotalDQM_, pickup_lock_, ratestat_, reasonForFailedState_, evf::FWEPWrapper::resetLumiSectionReferenceIndex(), rlimit_coresize_changed_, rlimit_coresize_default_, runNumber_, scalersLegendaInfoSpace_, scalersUpdates_, evf::RateStat::sendAuxLegenda(), evf::RateStat::sendLegenda(), evf::CPUStat::sendLegenda(), edm::event_processor::sError, evf::FWEPWrapper::setupFastTimerService(), sigmon_sem_, edm::event_processor::sInvalid, stor::utils::sleep(), sorRef_, edm::event_processor::sRunning, evf::Vulture::start(), start_lock_, startSignalMonitorWorkLoop(), startSummarizeWorkLoop(), edm::EventProcessor::stateName(), stop_lock_, subs_, vp_, and vulture_.

Referenced by sigmon().

494 {
495  nbTotalDQM_ = 0;
496  scalersUpdates_ = 0;
497  idleProcStats_ = 0;
498  allProcStats_ = 0;
499 // std::cout << "values " << ((nbSubProcesses_.value_!=0) ? 0x10 : 0) << " "
500 // << ((instance_.value_==0) ? 0x8 : 0) << " "
501 // << (hasServiceWebRegistry_.value_ ? 0x4 : 0) << " "
502 // << (hasModuleWebRegistry_.value_ ? 0x2 : 0) << " "
503 // << (hasPrescaleService_.value_ ? 0x1 : 0) <<std::endl;
504  unsigned short smap
505  = (datasetCounting_.value_ ? 0x20 : 0 )
506  + ((nbSubProcesses_.value_!=0) ? 0x10 : 0)
507  + (((instance_.value_%80)==0) ? 0x8 : 0) // have at least one legend per slice
508  + (hasServiceWebRegistry_.value_ ? 0x4 : 0)
509  + (hasModuleWebRegistry_.value_ ? 0x2 : 0)
510  + (hasPrescaleService_.value_ ? 0x1 : 0);
511 
512  LOG4CPLUS_INFO(getApplicationLogger(),"Start enabling...");
513 
514  //reset core limit size
516  setrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
518  crashesThisRun_=0;
519  //recreate signal monitor sem
520  sem_destroy(sigmon_sem_);
521  sem_init(sigmon_sem_,true,0);
522 
523  if(!epInitialized_){
525  }
526  std::string cfg = configString_.toString(); evtProcessor_.init(smap,cfg);
527 
530 
531  if(!epInitialized_){
534  if(cpustat_) {delete cpustat_; cpustat_=0;}
536  nbSubProcesses_.value_,
537  instance_.value_,
538  iDieUrl_.value_);
539  if(ratestat_) {delete ratestat_; ratestat_=0;}
540  ratestat_ = new RateStat(iDieUrl_.value_);
541  if(iDieStatisticsGathering_.value_)
542  {
543  try
544  {
546  xdata::Serializable *legenda = scalersLegendaInfoSpace_->find("scalersLegenda");
547  if(legenda !=0)
548  {
549  std::string slegenda = ((xdata::String*)legenda)->value_;
550  ratestat_->sendLegenda(slegenda);
551  }
552  if (sorRef_ && datasetCounting_.value_)
553  {
554  xdata::String dsLegenda = sorRef_->getDatasetCSV();
555  if (dsLegenda.value_.size())
556  ratestat_->sendAuxLegenda(dsLegenda);
557  }
558  }
559  catch(evf::Exception &e)
560  {
561  LOG4CPLUS_INFO(getApplicationLogger(),"could not send legenda"
562  << e.what());
563  }
564  catch (xcept::Exception& e) {
565  LOG4CPLUS_ERROR(getApplicationLogger(),"Failed to get or send legenda."
566  << e.what());
567  }
568  }
569  epInitialized_ = true;
570  }
571  configuration_ = evtProcessor_.configuration(); // get it again after init has been carried out...
573  //classic appl will return here
574  if(nbSubProcesses_.value_==0) return enableClassic();
575  //protect manipulation of subprocess array
576  pthread_mutex_lock(&start_lock_);
577  pthread_mutex_lock(&pickup_lock_);
578  subs_.clear();
579  subs_.resize(nbSubProcesses_.value_); // this should not be necessary
580  pid_t retval = -1;
581 
582  for(unsigned int i=0; i<nbSubProcesses_.value_; i++)
583  {
584  subs_[i]=SubProcess(i,retval); //this will replace all the scattered variables
585  }
586  pthread_mutex_unlock(&pickup_lock_);
587 
588  pthread_mutex_unlock(&start_lock_);
589 
590  //set expected number of child EP's for the Init message(s) sent to the SM
591  try {
592  if (sorRef_) {
593  unsigned int nbExpectedEPWriters = nbSubProcesses_.value_;
594  if (nbExpectedEPWriters==0) nbExpectedEPWriters=1;//master instance processing
595  std::vector<edm::FUShmOutputModule *> shmOutputs = sorRef_->getShmOutputModules();
596  for (unsigned int i=0;i<shmOutputs.size();i++) {
597  shmOutputs[i]->setNExpectedEPs(nbExpectedEPWriters);
598  }
599  }
600  }
601  catch (...)
602  {
603  LOG4CPLUS_ERROR(getApplicationLogger(),"Thrown Exception while setting nExpectedEPs in shmOutputs");
604  }
605 
606  //use new method if configured
607  edm_init_done_=true;
608  if (forkInEDM_.value_) {
609  edm_init_done_=false;
610  enableForkInEDM();
611  }
612  else
613  for(unsigned int i=0; i<nbSubProcesses_.value_; i++)
614  {
615  retval = subs_[i].forkNew();
616  if(retval==0)
617  {
618  myProcess_ = &subs_[i];
619  // dirty hack: delete/recreate global binary semaphore for later use in child
621  toolbox::mem::_s_mutex_ptr_ = new toolbox::BSem(toolbox::BSem::FULL,true);
622  int retval = pthread_mutex_destroy(&stop_lock_);
623  if(retval != 0) perror("error");
624  retval = pthread_mutex_init(&stop_lock_,0);
625  if(retval != 0) perror("error");
627 
628  return enableMPEPSlave();
629  // the loop is broken in the child
630  }
631  }
632 
633  if (forkInEDM_.value_) {
634 
636  while (!edm_init_done_) {
637  usleep(10000);
638  st = evtProcessor_->getState();
640  }
641  st = evtProcessor_->getState();
642  //error handling: EP must fork during sRunning
644  reasonForFailedState_ = std::string("Master edm::EventProcessor in state ") + evtProcessor_->stateName(st);
647  return false;
648  }
649 
650  sleep(1);
652  startSignalMonitorWorkLoop();//only with new forking
653  vp_ = vulture_->start(iDieUrl_.value_,runNumber_.value_);
654 
655  //enable after we are done with conditions loading and forking
656  LOG4CPLUS_INFO(getApplicationLogger(),"Finished enabling!");
657  fsm_.fireEvent("EnableDone",this);
658  localLog("-I- Start completed");
659  return false;
660  }
661 
663  vp_ = vulture_->start(iDieUrl_.value_,runNumber_.value_);
664  LOG4CPLUS_INFO(getApplicationLogger(),"Finished enabling!");
665  fsm_.fireEvent("EnableDone",this);
666  localLog("-I- Start completed");
667  return false;
668 }
xdata::Boolean hasModuleWebRegistry_
ShmOutputModuleRegistry * sorRef_
int i
Definition: DBlmapReader.cc:9
event_processor::State getState() const
xdata::Boolean datasetCounting_
xdata::UnsignedInteger32 runNumber_
pthread_mutex_t start_lock_
ShmOutputModuleRegistry * getShmOutputModuleRegistry()
Definition: FWEPWrapper.cc:491
xdata::UnsignedInteger32 forkInEDM_
void sendAuxLegenda(const std::string &)
Definition: RateStat.cc:29
xdata::Boolean hasServiceWebRegistry_
void init(unsigned short, std::string &)
Definition: FWEPWrapper.cc:193
void setupFastTimerService(unsigned int nProcesses)
Definition: FWEPWrapper.cc:497
xdata::InfoSpace * scalersLegendaInfoSpace_
void forceInitEventProcessorMaybe()
Definition: FWEPWrapper.h:68
unsigned int crashesThisRun_
void sleep(Duration_t)
Definition: Utils.h:163
std::vector< SubProcess > subs_
void fireFailed(const std::string &errorMsg, void *originator)
evf::StateMachine fsm_
ModuleWebRegistry * getModuleWebRegistry()
Definition: FWEPWrapper.cc:485
std::string reasonForFailedState_
void resetLumiSectionReferenceIndex()
Definition: FWEPWrapper.h:137
unsigned long long idleProcStats_
xdata::Boolean iDieStatisticsGathering_
int start(std::string, int=0)
Definition: Vulture.cc:241
std::vector< edm::FUShmOutputModule * > getShmOutputModules()
xdata::Boolean hasPrescaleService_
unsigned int getNumberOfMicrostates()
Definition: FWEPWrapper.h:140
char const * stateName(event_processor::State s) const
void fireEvent(const std::string &evtType, void *originator)
void disableRcmsStateNotification()
Definition: StateMachine.h:64
toolbox::BSem * _s_mutex_ptr_
void localLog(std::string)
ModuleWebRegistry * mwrRef_
xdata::UnsignedInteger32 nbSubProcesses_
xdata::String configString_
unsigned int scalersUpdates_
std::vector< std::string > const & getmicromap() const
Definition: FWEPWrapper.h:141
unsigned long long allProcStats_
std::string const & configuration() const
Definition: FWEPWrapper.h:102
void sendLegenda(const std::vector< std::string > &)
Definition: CPUStat.cc:39
pthread_mutex_t pickup_lock_
void sendLegenda(const std::string &)
Definition: RateStat.cc:24
xdata::UnsignedInteger32 instance_
pthread_mutex_t stop_lock_
xdata::Boolean epInitialized_
void FUEventProcessor::forkProcessesFromEDM ( )
private

Definition at line 2018 of file FUEventProcessor.cc.

References toolbox::mem::_s_mutex_ptr_, evf::FWEPWrapper::adjustLsIndexForRestart(), applicationInfoSpace_, attachDqmToShm(), gather_cfg::cout, evf::StateMachine::disableRcmsStateNotification(), alignCSCRings::e, edm_init_done_, evf::evfep_alarmhandler(), evf::evfep_sighandler(), evtProcessor_, cppFunctionSkipper::exception, evf::StateMachine::fireEvent(), evf::StateMachine::fireFailed(), forkInfoObj_, evf::moduleweb::ForkInfoObj::forkParams, fsm_, edm::PresenceFactory::get(), evf::ShmOutputModuleRegistry::getShmOutputModules(), hasShMem_, i, evf::moduleweb::ForkParams::isMaster, evf::FWEPWrapper::isWaitingForLs(), localLog(), edm::PresenceFactory::makePresence(), messageServicePresence_, myProcess_, nbSubProcesses_, reasonForFailedState_, evf::FWEPWrapper::resetPackedTriggerReport(), evf::moduleweb::ForkParams::restart, restart_in_progress_, hitfit::return, scalersUpdates_, ML::MLlog4cplus::setAppl(), evf::moduleweb::ForkParams::slotId, sorRef_, errorMatrix2Lands_multiChannel::start, startReceivingLoop(), startReceivingMonitorLoop(), startScalersWorkLoop(), evf::StateMachine::stateName(), stop_lock_, subs_, wlReceiving_, and wlReceivingMonitor_.

2018  {
2019 
2020  moduleweb::ForkParams * forkParams = &(forkInfoObj_->forkParams);
2021  unsigned int forkFrom=0;
2022  unsigned int forkTo=nbSubProcesses_.value_;
2023  if (forkParams->slotId>=0) {
2024  forkFrom=forkParams->slotId;
2025  forkTo=forkParams->slotId+1;
2026  }
2027 
2028  //before fork, make sure to disconnect output modules from Shm
2029  try {
2030  if (sorRef_) {
2031  std::vector<edm::FUShmOutputModule *> shmOutputs = sorRef_->getShmOutputModules();
2032  for (unsigned int i=0;i<shmOutputs.size();i++) {
2033  //unregister PID from ShmBuffer/RB
2034  shmOutputs[i]->unregisterFromShm();
2035  //disconnect from Shm
2036  shmOutputs[i]->stop();
2037  }
2038  }
2039  }
2040  catch (std::exception &e)
2041  {
2042  reasonForFailedState_ = (std::string)"Thrown exception while disconnecting ShmOutputModule from Shm: " + e.what();
2043  LOG4CPLUS_ERROR(getApplicationLogger(),reasonForFailedState_);
2046  }
2047  catch (...) {
2048  reasonForFailedState_ = "Thrown unknown exception while disconnecting ShmOutputModule from Shm: ";
2049  LOG4CPLUS_ERROR(getApplicationLogger(),reasonForFailedState_);
2052  }
2053 
2054  std::string currentState = fsm_.stateName()->toString();
2055 
2056  //destroy MessageServicePresence thread before fork
2057  if (currentState!="stopping") {
2058  try {
2059  messageServicePresence_.reset();
2060  }
2061  catch (...) {
2062  LOG4CPLUS_ERROR(getApplicationLogger(),"Unable to destroy MessageService thread before fork!");
2063  }
2064  }
2065 
2066  if (currentState=="stopping") {
2067  LOG4CPLUS_ERROR(getApplicationLogger(),"Can not fork subprocesses in state " << fsm_.stateName()->toString());
2068  forkParams->isMaster=1;
2071  }
2072  //fork loop
2073  else for(unsigned int i=forkFrom; i<forkTo; i++)
2074  {
2075  int retval = subs_[i].forkNew();
2076  if(retval==0)
2077  {
2078  forkParams->isMaster=0;
2079  myProcess_ = &subs_[i];
2080  // dirty hack: delete/recreate global binary semaphore for later use in child
2082  toolbox::mem::_s_mutex_ptr_ = new toolbox::BSem(toolbox::BSem::FULL,true);
2083  int retval = pthread_mutex_destroy(&stop_lock_);
2084  if(retval != 0) perror("error");
2085  retval = pthread_mutex_init(&stop_lock_,0);
2086  if(retval != 0) perror("error");
2088 
2089  //recreate MessageLogger thread in slave after fork
2090  try{
2092  if(pf != 0) {
2093  messageServicePresence_ = pf->makePresence("MessageServicePresence");
2094  }
2095  else {
2096  LOG4CPLUS_ERROR(getApplicationLogger(),
2097  "SLAVE: Unable to create message service presence. pid:"<<getpid());
2098  }
2099  }
2100  catch(...) {
2101  LOG4CPLUS_ERROR(getApplicationLogger(),"SLAVE: Unknown Exception in MessageServicePresence. pid:"<<getpid());
2102  }
2103 
2105 
2106  //reconnect to Shm from output modules
2107  try {
2108  if (sorRef_) {
2109  std::vector<edm::FUShmOutputModule *> shmOutputs = sorRef_->getShmOutputModules();
2110  for (unsigned int i=0;i<shmOutputs.size();i++)
2111  shmOutputs[i]->start();
2112  }
2113  }
2114  catch (...)
2115  {
2116  LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception (ShmOutputModule sending InitMsg (pid:"<<getpid() <<")");
2117  }
2118 
2119  if (forkParams->restart) {
2120  //do restart things
2121  scalersUpdates_ = 0;
2122  try {
2123  fsm_.fireEvent("Stop",this); // need to set state in fsm first to allow stopDone transition
2124  fsm_.fireEvent("StopDone",this); // need to set state in fsm first to allow stopDone transition
2125  fsm_.fireEvent("Enable",this); // need to set state in fsm first to allow stopDone transition
2126  } catch (...) {
2127  LOG4CPLUS_WARN(getApplicationLogger(),"Failed to Stop/Enable FSM of the restarted slave EP");
2128  }
2129  try{
2130  xdata::Serializable *lsid = applicationInfoSpace_->find("lumiSectionIndex");
2131  if(lsid) {
2132  ((xdata::UnsignedInteger32*)(lsid))->value_--; // need to reset to value before end of ls in which process died
2133  }
2134  }
2135  catch(...){
2136  std::cout << "trouble with lsindex during restart" << std::endl;
2137  }
2138  try{
2139  xdata::Serializable *lstb = applicationInfoSpace_->find("lsToBeRecovered");
2140  if(lstb) {
2141  ((xdata::Boolean*)(lstb))->value_ = false; // do not issue eol/bol for all Ls when restarting
2142  }
2143  }
2144  catch(...){
2145  std::cout << "trouble with resetting flag for eol recovery " << std::endl;
2146  }
2149  }
2150 
2151  //start other threads
2155  while(!evtProcessor_.isWaitingForLs())
2156  ::usleep(100000);//wait for scalers loop to start
2157 
2158  //connect DQMShmOutputModule
2159  if(hasShMem_) attachDqmToShm();
2160 
2161  //catch transition error if we are already Enabled
2162  try {
2163  fsm_.fireEvent("EnableDone",this);
2164  }
2165  catch (...) {}
2166 
2167  //make sure workloops are started
2168  while (!wlReceiving_->isActive() || !wlReceivingMonitor_->isActive()) usleep(10000);
2169 
2170  //unmask signals
2171  sigset_t tmpset_thread;
2172  sigemptyset(&tmpset_thread);
2173  sigaddset(&tmpset_thread, SIGQUIT);
2174  sigaddset(&tmpset_thread, SIGILL);
2175  sigaddset(&tmpset_thread, SIGABRT);
2176  sigaddset(&tmpset_thread, SIGFPE);
2177  sigaddset(&tmpset_thread, SIGSEGV);
2178  sigaddset(&tmpset_thread, SIGALRM);
2179  //sigprocmask(SIG_UNBLOCK, &tmpset_thread, 0);
2180  pthread_sigmask(SIG_UNBLOCK,&tmpset_thread,0);
2181 
2182  //set signal handlers
2183  struct sigaction sa;
2184  sigset_t tmpset;
2185  memset(&tmpset,0,sizeof(tmpset));
2186  sigemptyset(&tmpset);
2187  sa.sa_mask=tmpset;
2188  sa.sa_flags=SA_RESETHAND | SA_SIGINFO;
2189  sa.sa_handler=0;
2190  sa.sa_sigaction=evfep_sighandler;
2191 
2192  sigaction(SIGQUIT,&sa,0);
2193  sigaction(SIGILL,&sa,0);
2194  sigaction(SIGABRT,&sa,0);
2195  sigaction(SIGFPE,&sa,0);
2196  sigaction(SIGSEGV,&sa,0);
2197  sa.sa_sigaction=evfep_alarmhandler;
2198  sigaction(SIGALRM,&sa,0);
2199 
2200  //child return to DaqSource
2201  return ;
2202  }
2203  else {
2204 
2205  forkParams->isMaster=1;
2207  if (forkParams->restart) {
2208  std::ostringstream ost1;
2209  ost1 << "-I- New Process " << retval << " forked for slot " << forkParams->slotId;
2210  localLog(ost1.str());
2211  }
2213  //start "crash" receiver workloop
2214  }
2215  }
2216 
2217  //recreate MessageLogger thread after fork
2218  try{
2219  //release the presense factory in master
2221  if(pf != 0) {
2222  messageServicePresence_ = pf->makePresence("MessageServicePresence");
2223  }
2224  else {
2225  LOG4CPLUS_ERROR(getApplicationLogger(),
2226  "Unable to recreate message service presence ");
2227  }
2228  }
2229  catch(...) {
2230  LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception in MessageServicePresence");
2231  }
2232  restart_in_progress_=false;
2233  edm_init_done_=true;
2234 }
ShmOutputModuleRegistry * sorRef_
int i
Definition: DBlmapReader.cc:9
toolbox::task::WorkLoop * wlReceiving_
xdata::Boolean hasShMem_
void evfep_sighandler(int sig, siginfo_t *info, void *c)
std::vector< SubProcess > subs_
void fireFailed(const std::string &errorMsg, void *originator)
void adjustLsIndexForRestart()
Definition: FWEPWrapper.h:112
evf::StateMachine fsm_
static void setAppl(xdaq::Application *app)
Definition: MLlog4cplus.cc:85
std::string reasonForFailedState_
void resetPackedTriggerReport()
Definition: FWEPWrapper.h:111
std::vector< edm::FUShmOutputModule * > getShmOutputModules()
moduleweb::ForkInfoObj * forkInfoObj_
void fireEvent(const std::string &evtType, void *originator)
void disableRcmsStateNotification()
Definition: StateMachine.h:64
toolbox::BSem * _s_mutex_ptr_
static PresenceFactory * get()
void localLog(std::string)
xdata::InfoSpace * applicationInfoSpace_
xdata::UnsignedInteger32 nbSubProcesses_
std::auto_ptr< Presence > makePresence(std::string const &presence_type) const
xdata::String * stateName()
Definition: StateMachine.h:69
unsigned int scalersUpdates_
std::auto_ptr< edm::Presence > messageServicePresence_
bool isWaitingForLs()
Definition: FWEPWrapper.h:135
tuple cout
Definition: gather_cfg.py:121
toolbox::task::WorkLoop * wlReceivingMonitor_
pthread_mutex_t stop_lock_
void evfep_alarmhandler(int sig, siginfo_t *info, void *c)
void FUEventProcessor::forkProcessFromEDM_helper ( void *  addr)
static

Definition at line 2014 of file FUEventProcessor.cc.

Referenced by enableForkInEDM().

2014  {
2015  ((FUEventProcessor*)addr)->forkProcessesFromEDM();
2016 }
xoap::MessageReference FUEventProcessor::fsmCallback ( xoap::MessageReference  msg)
throw (xoap::exception::Exception
)

Definition at line 802 of file FUEventProcessor.cc.

References evf::StateMachine::commandCallback(), fsm_, and lumiQueryAPI::msg.

804 {
805  return fsm_.commandCallback(msg);
806 }
xoap::MessageReference commandCallback(xoap::MessageReference msg)
Definition: StateMachine.cc:71
evf::StateMachine fsm_
void FUEventProcessor::getSlavePids ( xgi::Input in,
xgi::Output out 
)

Definition at line 844 of file FUEventProcessor.cc.

References i, and subs_.

Referenced by FUEventProcessor().

845 {
846  for (unsigned int i=0;i<subs_.size();i++)
847  {
848  if (i!=0) *out << ",";
849  *out << subs_[i].pid();
850  }
851 }
int i
Definition: DBlmapReader.cc:9
std::vector< SubProcess > subs_
tuple out
Definition: dbtoconf.py:99
bool FUEventProcessor::halting ( toolbox::task::WorkLoop *  wl)

Definition at line 765 of file FUEventProcessor.cc.

References doEndRunInEDM(), alignCSCRings::e, evtProcessor_, evf::StateMachine::fireEvent(), evf::StateMachine::fireFailed(), forkInEDM_, forkInfoObj_, fsm_, localLog(), nbSubProcesses_, reasonForFailedState_, rlimit_coresize_default_, sigmon_sem_, evf::FWEPWrapper::stopAndHalt(), and stopSlavesAndAcknowledge().

766 {
767  LOG4CPLUS_INFO(getApplicationLogger(),"Start halting ...");
768  setrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
769  if(nbSubProcesses_.value_!=0) {
771  if (forkInEDM_.value_) {
772  sem_post(sigmon_sem_);
773  if (!doEndRunInEDM())
774  return false;
775 
776  }
777  }
778  try{
780  //cleanup forking variables
781  if (forkInfoObj_) {
782  delete forkInfoObj_;
783  forkInfoObj_=0;
784  }
785  }
786  catch (evf::Exception &e) {
787  reasonForFailedState_ = "halting FAILED: " + (std::string)e.what();
790  }
791  // if(hasShMem_) detachDqmFromShm();
792 
793  LOG4CPLUS_INFO(getApplicationLogger(),"Finished halting!");
794  fsm_.fireEvent("HaltDone",this);
795 
796  localLog("-I- Halt completed");
797  return false;
798 }
xdata::UnsignedInteger32 forkInEDM_
void fireFailed(const std::string &errorMsg, void *originator)
evf::StateMachine fsm_
std::string reasonForFailedState_
moduleweb::ForkInfoObj * forkInfoObj_
void fireEvent(const std::string &evtType, void *originator)
void localLog(std::string)
xdata::UnsignedInteger32 nbSubProcesses_
void FUEventProcessor::handleSignalSlave ( int  sig,
siginfo_t *  info,
void *  c 
)

Definition at line 2709 of file FUEventProcessor.cc.

References gather_cfg::cout, rlimit_coresize_changed_, sigmon_sem_, stor::utils::sleep(), and stacktrace().

Referenced by evf::evfep_sighandler().

2710 {
2711  //notify master
2712  sem_post(sigmon_sem_);
2713 
2714  //sleep while master takes action
2715  sleep(2);
2716 
2717  //set up alarm if handler deadlocks on unsafe actions
2718  alarm(5);
2719 
2720  std::cout << "--- Slave EP signal handler caught signal " << sig << " process id is " << info->si_pid <<" ---" << std::endl;
2721  std::cout << "--- Address: " << std::hex << info->si_addr << std::dec << " --- " << std::endl;
2722  std::cout << "--- Stacktrace follows --" << std::endl;
2723  std::ostringstream stacktr;
2724  toolbox::stacktrace(20,stacktr);
2725  std::cout << stacktr.str();
2727  std::cout << "--- Dumping core." << " --- " << std::endl;
2728  else
2729  std::cout << "--- Core dump count exceeded on this FU. ---"<<std::endl;
2730 
2731  std::string hasdump = "";
2732  if (rlimit_coresize_changed_) hasdump = " (core dump disabled) ";
2733 
2734  LOG4CPLUS_ERROR(getApplicationLogger(), "--- Slave EP signal handler caught signal " << sig << ". process id is " << getpid()
2735  << " on node " << toolbox::net::getHostName() << " ---" << std::endl
2736  << "--- Address: " << std::hex << info->si_addr << std::dec << " --- " << std::endl
2737  << "--- Stacktrace follows"<< hasdump << " ---" << std::endl << stacktr.str()
2738  );
2739 
2740  //re-raise signal with default handler (will cause core dump if enabled)
2741  raise(sig);
2742 }
void sleep(Duration_t)
Definition: Utils.h:163
int stacktrace(void *addresses[], int nmax)
tuple cout
Definition: gather_cfg.py:121
void FUEventProcessor::localLog ( std::string  m)
private

Definition at line 1116 of file FUEventProcessor.cc.

References logRing_, logRingIndex_, logRingSize_, logWrap_, m, and cond::timestamp.

Referenced by configuring(), enableClassic(), enableCommon(), enableMPEPSlave(), enabling(), forkProcessesFromEDM(), halting(), stopClassic(), stopSlavesAndAcknowledge(), and supervisor().

1117 {
1118  timeval tv;
1119 
1120  gettimeofday(&tv,0);
1121  tm *uptm = localtime(&tv.tv_sec);
1122  char datestring[256];
1123  strftime(datestring, sizeof(datestring),"%c", uptm);
1124 
1125  if(logRingIndex_ == 0){logWrap_ = true; logRingIndex_ = logRingSize_;}
1126  logRingIndex_--;
1127  std::ostringstream timestamp;
1128  timestamp << " at " << datestring;
1129  m += timestamp.str();
1131 }
std::vector< std::string > logRing_
static const unsigned int logRingSize_
std::string FUEventProcessor::logsAsString ( )
private

Definition at line 1099 of file FUEventProcessor.cc.

References i, logRing_, logRingIndex_, and logWrap_.

1100 {
1101  std::ostringstream oss;
1102  if(logWrap_)
1103  {
1104  for(unsigned int i = logRingIndex_; i < logRing_.size(); i++)
1105  oss << logRing_[i] << std::endl;
1106  for(unsigned int i = 0; i < logRingIndex_; i++)
1107  oss << logRing_[i] << std::endl;
1108  }
1109  else
1110  for(unsigned int i = logRingIndex_; i < logRing_.size(); i++)
1111  oss << logRing_[i] << std::endl;
1112 
1113  return oss.str();
1114 }
int i
Definition: DBlmapReader.cc:9
std::vector< std::string > logRing_
void FUEventProcessor::makeStaticInfo ( )
private

Definition at line 2674 of file FUEventProcessor.cc.

References applicationInfoSpace_, evf::utils::cDiv(), alignCSCRings::e, edm::hlt::Exception, edm::getReleaseVersion(), hasModuleWebRegistry_, hasServiceWebRegistry_, hasShMem_, evf::utils::mDiv(), outPut_, and updaterStatic_.

Referenced by FUEventProcessor().

2675 {
2676  using namespace utils;
2677  std::ostringstream ost;
2678  mDiv(&ost,"ve");
2679  ost<< "$Revision: 1.162 $ (" << edm::getReleaseVersion() <<")";
2680  cDiv(&ost);
2681  mDiv(&ost,"ou",outPut_.toString());
2682  mDiv(&ost,"sh",hasShMem_.toString());
2683  mDiv(&ost,"mw",hasModuleWebRegistry_.toString());
2684  mDiv(&ost,"sw",hasServiceWebRegistry_.toString());
2685 
2686  xdata::Serializable *monsleep = 0;
2687  xdata::Serializable *lstimeout = 0;
2688  try{
2689  monsleep = applicationInfoSpace_->find("monSleepSec");
2690  lstimeout = applicationInfoSpace_->find("lsTimeOut");
2691  }
2693  }
2694 
2695  if(monsleep!=0)
2696  mDiv(&ost,"ms",monsleep->toString());
2697  if(lstimeout!=0)
2698  mDiv(&ost,"lst",lstimeout->toString());
2699  char cbuf[sizeof(struct utsname)];
2700  struct utsname* buf = (struct utsname*)cbuf;
2701  uname(buf);
2702  mDiv(&ost,"sysinfo");
2703  ost << buf->sysname << " " << buf->nodename
2704  << " " << buf->release << " " << buf->version << " " << buf->machine;
2705  cDiv(&ost);
2706  updaterStatic_ = ost.str();
2707 }
xdata::Boolean hasModuleWebRegistry_
xdata::Boolean hasServiceWebRegistry_
xdata::Boolean hasShMem_
void cDiv(std::ostringstream *out)
Definition: procUtils.cc:357
std::string getReleaseVersion()
xdata::InfoSpace * applicationInfoSpace_
void mDiv(std::ostringstream *out, std::string name)
Definition: procUtils.cc:354
void FUEventProcessor::microState ( xgi::Input in,
xgi::Output out 
)

Definition at line 2480 of file FUEventProcessor.cc.

References autoRestartSlaves_, gather_cfg::cout, alignCSCRings::e, evtProcessor_, cppFunctionSkipper::exception, fsm_, evf::FWEPWrapper::getScalersUpdates(), i, evf::FWEPWrapper::microState(), evf::FWEPWrapper::moduleNameFromIndex(), myProcess_, nbdead_, nblive_, nbSubProcesses_, nbTotalDQM_, pickup_lock_, start_lock_, evf::StateMachine::stateName(), evf::FWEPWrapper::stateNameFromIndex(), subs_, and cms::Exception::what().

Referenced by FUEventProcessor().

2481 {
2482  std::string urn = getApplicationDescriptor()->getURN();
2483  try{
2486  if(myProcess_) {std::cout << "microstate called for child! bail out" << std::endl; return;}
2487  *out << "<tr><td>" << fsm_.stateName()->toString()
2488  << "</td><td>"<< (myProcess_ ? "S" : "M") <<"</td><td>" << nblive_ << "</td><td>"
2489  << nbdead_ << "</td><td><a href=\"/" << urn << "/procStat\">" << getpid() <<"</a></td>";
2491  *out << "<td></td><td>" << nbTotalDQM_
2492  << "</td><td>" << evtProcessor_.getScalersUpdates() << "</td></tr>";
2493  if(nbSubProcesses_.value_!=0 && !myProcess_)
2494  {
2495  pthread_mutex_lock(&start_lock_);
2496  for(unsigned int i = 0; i < subs_.size(); i++)
2497  {
2498  try{
2499  if(subs_[i].alive()>0)
2500  {
2501  *out << "<tr><td bgcolor=\"#00FF00\" id=\"a"
2502  << i << "\">""Alive</td><td>S</td><td>"
2503  << subs_[i].queueId() << "<td>"
2504  << subs_[i].queueStatus()<< "/"
2505  << subs_[i].queueOccupancy() << "/"
2506  << subs_[i].queuePidOfLastSend() << "/"
2507  << subs_[i].queuePidOfLastReceive()
2508  << "</td><td><a id=\"p"<< i << "\" href=\"SubWeb?process="
2509  << subs_[i].pid() << "&method=procStat\">"
2510  << subs_[i].pid()<<"</a></td>" //<< msg->mtext;
2511  << "<td>" << evtProcessor_.stateNameFromIndex(subs_[i].params().Ms) << "</td><td>"
2512  << evtProcessor_.moduleNameFromIndex(subs_[i].params().ms) << "</td><td>"
2513  << subs_[i].params().nba << "/" << subs_[i].params().nbp
2514  << " (" << float(subs_[i].params().nba)/float(subs_[i].params().nbp)*100. <<"%)"
2515  << "</td><td>" << subs_[i].params().ls << "/" << subs_[i].params().ls
2516  << "</td><td>" << subs_[i].params().ps
2517  << "</td><td"
2518  << ((subs_[i].params().eols<subs_[i].params().ls) ? " bgcolor=\"#00FF00\"" : " bgcolor=\"#FF0000\"") << ">"
2519  << subs_[i].params().eols
2520  << "</td><td>" << subs_[i].params().dqm
2521  << "</td><td>" << subs_[i].params().trp << "</td>";
2522  }
2523  else
2524  {
2525  pthread_mutex_lock(&pickup_lock_);
2526  *out << "<tr><td id=\"a"<< i << "\" ";
2527  if(subs_[i].alive()==-1000)
2528  *out << " bgcolor=\"#bbaabb\">NotInitialized";
2529  else
2530  *out << (subs_[i].alive()==0 ? ">Done" : " bgcolor=\"#FF0000\">Dead");
2531  *out << "</td><td>S</td><td>"<< subs_[i].queueId() << "<td>"
2532  << subs_[i].queueStatus() << "/"
2533  << subs_[i].queueOccupancy() << "/"
2534  << subs_[i].queuePidOfLastSend() << "/"
2535  << subs_[i].queuePidOfLastReceive()
2536  << "</td><td id=\"p"<< i << "\">"
2537  <<subs_[i].pid()<<"</td><td colspan=\"5\">" << subs_[i].reasonForFailed();
2538  if(subs_[i].alive()!=0 && subs_[i].alive()!=-1000)
2539  {
2540  if(autoRestartSlaves_ && subs_[i].restartCount()<=2)
2541  *out << " will restart in " << subs_[i].countdown() << " s";
2542  else if(autoRestartSlaves_)
2543  *out << " reached maximum restart count";
2544  else *out << " autoRestart is disabled ";
2545  }
2546  *out << "</td><td"
2547  << ((subs_[i].params().eols<subs_[i].params().ls) ?
2548  " bgcolor=\"#00FF00\"" : " bgcolor=\"#FF0000\"")
2549  << ">"
2550  << subs_[i].params().eols
2551  << "</td><td>" << subs_[i].params().dqm
2552  << "</td><td>" << subs_[i].params().trp << "</td>";
2553  pthread_mutex_unlock(&pickup_lock_);
2554  }
2555  *out << "</tr>";
2556  }
2557  catch(evf::Exception &e){
2558  *out << "<tr><td id=\"a"<< i << "\" "
2559  <<"bgcolor=\"#FFFF00\">NotResp</td><td>S</td><td>"<< subs_[i].queueId() << "<td>"
2560  << subs_[i].queueStatus() << "/"
2561  << subs_[i].queueOccupancy() << "/"
2562  << subs_[i].queuePidOfLastSend() << "/"
2563  << subs_[i].queuePidOfLastReceive()
2564  << "</td><td id=\"p"<< i << "\">"
2565  <<subs_[i].pid()<<"</td>";
2566  }
2567  }
2568  pthread_mutex_unlock(&start_lock_);
2569  }
2570  }
2571  catch(evf::Exception &e)
2572  {
2573  LOG4CPLUS_INFO(getApplicationLogger(),"evf::Exception caught in microstate - " << e.what());
2574  }
2575  catch(cms::Exception &e)
2576  {
2577  LOG4CPLUS_INFO(getApplicationLogger(),"cms::Exception caught in microstate - " << e.what());
2578  }
2579  catch(std::exception &e)
2580  {
2581  LOG4CPLUS_INFO(getApplicationLogger(),"std::Exception caught in microstate - " << e.what());
2582  }
2583  catch(...)
2584  {
2585  LOG4CPLUS_INFO(getApplicationLogger(),"unknown exception caught in microstate - ");
2586  }
2587 
2588 }
virtual char const * what() const
Definition: Exception.cc:141
int i
Definition: DBlmapReader.cc:9
pthread_mutex_t start_lock_
xdata::Boolean autoRestartSlaves_
std::string const & moduleNameFromIndex(unsigned int i) const
Definition: FWEPWrapper.h:122
std::vector< SubProcess > subs_
evf::StateMachine fsm_
tuple out
Definition: dbtoconf.py:99
xdata::UnsignedInteger32 nbSubProcesses_
xdata::String * stateName()
Definition: StateMachine.h:69
std::string const & stateNameFromIndex(unsigned int i) const
Definition: FWEPWrapper.h:127
pthread_mutex_t pickup_lock_
tuple cout
Definition: gather_cfg.py:121
void microState(xgi::Input *in, xgi::Output *out)
unsigned int getScalersUpdates()
Definition: FWEPWrapper.h:136
void evf::FUEventProcessor::moduleWeb ( xgi::Input in,
xgi::Output out 
)
inline

Definition at line 114 of file FUEventProcessor.h.

References evtProcessor_, and evf::FWEPWrapper::moduleWeb().

Referenced by FUEventProcessor(), and receivingAndMonitor().

void moduleWeb(xgi::Input *in, xgi::Output *out)
tuple out
Definition: dbtoconf.py:99
void FUEventProcessor::pathNames ( xgi::Input in,
xgi::Output out 
)
throw (xgi::exception::Exception
)

Definition at line 1035 of file FUEventProcessor.cc.

References evtProcessor_, dbtoconf::out, and scalersLegendaInfoSpace_.

Referenced by FUEventProcessor().

1037 {
1038 
1039  if(evtProcessor_ != 0){
1040  xdata::Serializable *legenda = scalersLegendaInfoSpace_->find("scalersLegenda");
1041  if(legenda !=0){
1042  std::string slegenda = ((xdata::String*)legenda)->value_;
1043  *out << slegenda << std::endl;
1044  }
1045  }
1046 }
xdata::InfoSpace * scalersLegendaInfoSpace_
tuple out
Definition: dbtoconf.py:99
void FUEventProcessor::procStat ( xgi::Input in,
xgi::Output out 
)

Definition at line 2664 of file FUEventProcessor.cc.

References evf::utils::procStat().

Referenced by FUEventProcessor(), and receivingAndMonitor().

2665 {
2667 }
void procStat(std::ostringstream *out)
Definition: procUtils.cc:176
tuple out
Definition: dbtoconf.py:99
bool FUEventProcessor::receiving ( toolbox::task::WorkLoop *  wl)
private

Definition at line 1188 of file FUEventProcessor.cc.

References alignCSCRings::e, evf::StateMachine::fireEvent(), fsm_, messageServicePresence_, lumiQueryAPI::msg, MSQM_MESSAGE_TYPE_FSTOP, MSQM_MESSAGE_TYPE_RLI, MSQM_MESSAGE_TYPE_RLR, MSQM_MESSAGE_TYPE_STOP, MSQS_MESSAGE_TYPE_STOP, myProcess_, evf::SubProcess::postSlave(), evf::SubProcess::rcvSlave(), rlimit_coresize_changed_, rlimit_coresize_default_, stop_lock_, and stopClassic().

Referenced by startReceivingLoop().

1189 {
1190  MsgBuf msg;
1191  try{
1192  myProcess_->rcvSlave(msg,false); //will receive only messages from Master
1193  if(msg->mtype==MSQM_MESSAGE_TYPE_RLI)
1194  {
1195  rlimit rl;
1196  getrlimit(RLIMIT_CORE,&rl);
1197  rl.rlim_cur=0;
1198  setrlimit(RLIMIT_CORE,&rl);
1200  }
1201  if (msg->mtype==MSQM_MESSAGE_TYPE_RLR)
1202  {
1203  //reset coresize limit
1204  setrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
1206  }
1207  if(msg->mtype==MSQM_MESSAGE_TYPE_STOP)
1208  {
1209  pthread_mutex_lock(&stop_lock_);
1210  try {
1211  fsm_.fireEvent("Stop",this); // need to set state in fsm first to allow stopDone transition
1212  }
1213  catch (...) {
1214  LOG4CPLUS_ERROR(getApplicationLogger(),"Failed to go to Stopping state in slave EP, pid "
1215  << getpid() << " The state on Stop event was not consistent");
1216  }
1217 
1218  try {
1219  stopClassic(); // call the normal sequence of stopping - as this is allowed to fail provisions must be made ...@@@EM
1220  }
1221  catch (...) {
1222  LOG4CPLUS_ERROR(getApplicationLogger(),"Slave EP 'receiving' workloop: exception " << getpid());
1223  }
1224 
1225  //destroy MessageService thread before exit
1226  try{
1227  messageServicePresence_.reset();
1228  }
1229  catch(...) {
1230  LOG4CPLUS_ERROR(getApplicationLogger(),"SLAVE:Unable to destroy MessageServicePresence. pid:" << getpid() );
1231  }
1232 
1234  myProcess_->postSlave(msg1,false);
1235  pthread_mutex_unlock(&stop_lock_);
1236  fclose(stdout);
1237  fclose(stderr);
1238  _exit(EXIT_SUCCESS);
1239  }
1240  if(msg->mtype==MSQM_MESSAGE_TYPE_FSTOP)
1241  _exit(EXIT_SUCCESS);
1242  }
1243  catch(evf::Exception &e){
1244  LOG4CPLUS_ERROR(getApplicationLogger(),"Slave EP pid:" << getpid() << " receiving WorkLoop exception: "<<e.what());
1245  }
1246  return true;
1247 }
unsigned long rcvSlave(MsgBuf &ptr, bool isMonitor)
Definition: SubProcess.h:116
#define MSQM_MESSAGE_TYPE_FSTOP
Definition: queue_defs.h:17
#define MSQM_MESSAGE_TYPE_RLR
Definition: queue_defs.h:25
#define MSQS_MESSAGE_TYPE_STOP
Definition: queue_defs.h:31
evf::StateMachine fsm_
#define MSQM_MESSAGE_TYPE_RLI
Definition: queue_defs.h:24
int postSlave(MsgBuf &ptr, bool isMonitor)
Definition: SubProcess.h:110
void fireEvent(const std::string &evtType, void *originator)
std::auto_ptr< edm::Presence > messageServicePresence_
#define MSQM_MESSAGE_TYPE_STOP
Definition: queue_defs.h:16
pthread_mutex_t stop_lock_
bool FUEventProcessor::receivingAndMonitor ( toolbox::task::WorkLoop *  wl)
private

Definition at line 1740 of file FUEventProcessor.cc.

References anonymousPipe_, applicationInfoSpace_, harvestRelVal::args, prof2calltree::count, gather_cfg::cout, cycle, data, defaultWebPage(), evf::prg::dqm, alignCSCRings::e, evf::prg::eols, evf::FWEPWrapper::epMAltState_, evf::FWEPWrapper::epmAltState_, evtProcessor_, edm::hlt::Exception, exitOnError_, spr::find(), first, fsm_, evf::internal::MyCgi::getEnvironment(), recoMuon::in, Input, evf::FWEPWrapper::lastLumiUsingEol_, evf::prg::ls, evf::FWEPWrapper::lsid_, MAX_PIPE_BUFFER_SIZE, PFRecoTauDiscriminationAgainstElectronMVA2_cfi::method, evf::FWEPWrapper::microState(), moduleWeb(), evf::FWEPWrapper::monitoring(), evf::prg::Ms, evf::prg::ms, MSQM_MESSAGE_TYPE_MCS, MSQM_MESSAGE_TYPE_PRG, MSQM_MESSAGE_TYPE_TRP, MSQM_MESSAGE_TYPE_WEB, MSQS_MESSAGE_TYPE_MCR, MSQS_MESSAGE_TYPE_WEB, myProcess_, evf::prg::nba, evf::prg::nbp, NUMERIC_MESSAGE_SIZE, dbtoconf::out, Output, PIPE_WRITE, pos, evf::SubProcess::postSlave(), procStat(), evf::prg::ps, evf::FWEPWrapper::psid_, o2o::query, evf::SubProcess::rcvSlave(), scalersUpdates_, edm::second(), edm::event_processor::sError, slave_message_monitoring_, slave_message_prr_, stor::utils::sleep(), spotlightWebPage(), evf::StateMachine::stateName(), stop_lock_, edm::EventProcessor::totalEvents(), edm::EventProcessor::totalEventsPassed(), evf::prg::trp, and TablePrint::write.

Referenced by startReceivingMonitorLoop().

1741 {
1742  try{
1743  myProcess_->rcvSlave(slave_message_monitoring_,true); //will receive only messages from Master
1744  switch(slave_message_monitoring_->mtype)
1745  {
1746  case MSQM_MESSAGE_TYPE_MCS:
1747  {
1748  xgi::Input *in = 0;
1749  xgi::Output out;
1750  evtProcessor_.microState(in,&out);
1751  MsgBuf msg1(out.str().size(),MSQS_MESSAGE_TYPE_MCR);
1752  strncpy(msg1->mtext,out.str().c_str(),out.str().size());
1753  myProcess_->postSlave(msg1,true);
1754  break;
1755  }
1756 
1757  case MSQM_MESSAGE_TYPE_PRG:
1758  {
1759  xdata::Serializable *dqmp = 0;
1760  xdata::UnsignedInteger32 *dqm = 0;
1762  try{
1763  dqmp = applicationInfoSpace_-> find("nbDqmUpdates");
1764  } catch(xdata::exception::Exception e){}
1765  if(dqmp!=0)
1766  dqm = (xdata::UnsignedInteger32*)dqmp;
1767 
1768  // monitorInfoSpace_->lock();
1769  prg * data = (prg*)slave_message_prr_->mtext;
1770  data->ls = evtProcessor_.lsid_;
1772  data->ps = evtProcessor_.psid_;
1773  data->nbp = evtProcessor_->totalEvents();
1774  data->nba = evtProcessor_->totalEventsPassed();
1775  data->Ms = evtProcessor_.epMAltState_.value_;
1776  data->ms = evtProcessor_.epmAltState_.value_;
1777  if(dqm) data->dqm = dqm->value_; else data->dqm = 0;
1778  data->trp = scalersUpdates_;
1779  // monitorInfoSpace_->unlock();
1781  if(exitOnError_.value_)
1782  {
1783  // after each monitoring cycle check if we are in inconsistent state and exit if configured to do so
1784  // std::cout << getpid() << "receivingAndMonitor: trying to acquire stop lock " << std::endl;
1785  if(data->Ms == edm::event_processor::sError)
1786  {
1787  bool running = true;
1788  int count = 0;
1789  while(running){
1790  int retval = pthread_mutex_lock(&stop_lock_);
1791  if(retval != 0) perror("error");
1792  running = fsm_.stateName()->toString()=="Enabled";
1793  if(count>5) _exit(-1);
1794  pthread_mutex_unlock(&stop_lock_);
1795  if(running) {::sleep(1); count++;}
1796  }
1797  }
1798  }
1799  break;
1800  }
1801  case MSQM_MESSAGE_TYPE_WEB:
1802  {
1803  xgi::Input *in = 0;
1804  xgi::Output out;
1805  unsigned int bytesToSend = 0;
1807  std::string query = slave_message_monitoring_->mtext;
1808  size_t pos = query.find_first_of("&");
1809  std::string method;
1810  std::string args;
1811  if(pos!=std::string::npos)
1812  {
1813  method = query.substr(0,pos);
1814  args = query.substr(pos+1,query.length()-pos-1);
1815  }
1816  else
1817  method=query;
1818 
1819  if(method=="Spotlight")
1820  {
1821  spotlightWebPage(in,&out);
1822  }
1823  else if(method=="procStat")
1824  {
1825  procStat(in,&out);
1826  }
1827  else if(method=="moduleWeb")
1828  {
1829  internal::MyCgi mycgi;
1830  boost::char_separator<char> sep(";");
1831  boost::tokenizer<boost::char_separator<char> > tokens(args, sep);
1832  for (boost::tokenizer<boost::char_separator<char> >::iterator tok_iter = tokens.begin();
1833  tok_iter != tokens.end(); ++tok_iter){
1834  size_t pos = (*tok_iter).find_first_of("%");
1835  if(pos != std::string::npos){
1836  std::string first = (*tok_iter).substr(0 , pos);
1837  std::string second = (*tok_iter).substr(pos+1, (*tok_iter).length()-pos-1);
1838  mycgi.getEnvironment()[first]=second;
1839  }
1840  }
1841  moduleWeb(&mycgi,&out);
1842  }
1843  else if(method=="Default")
1844  {
1845  defaultWebPage(in,&out);
1846  }
1847  else
1848  {
1849  out << "Error 404!!!!!!!!" << std::endl;
1850  }
1851 
1852 
1853  bytesToSend = out.str().size();
1854  unsigned int cycle = 0;
1855  if(bytesToSend==0)
1856  {
1857  snprintf(msg1->mtext, NUMERIC_MESSAGE_SIZE, "%d", bytesToSend);
1858  myProcess_->postSlave(msg1,true);
1859  }
1860  while(bytesToSend !=0){
1861  unsigned int msgSize = bytesToSend>MAX_PIPE_BUFFER_SIZE ? MAX_PIPE_BUFFER_SIZE : bytesToSend;
1863  out.str().c_str()+MAX_PIPE_BUFFER_SIZE*cycle,
1864  msgSize);
1865  snprintf(msg1->mtext, NUMERIC_MESSAGE_SIZE, "%d", msgSize);
1866  myProcess_->postSlave(msg1,true);
1867  bytesToSend -= msgSize;
1868  cycle++;
1869  }
1870  break;
1871  }
1872  case MSQM_MESSAGE_TYPE_TRP:
1873  {
1874  break;
1875  }
1876  }
1877  }
1878  catch(evf::Exception &e){std::cout << "exception caught in recevingM: " << e.what() << std::endl;}
1879  return true;
1880 }
unsigned int ps
Definition: queue_defs.h:60
unsigned long rcvSlave(MsgBuf &ptr, bool isMonitor)
Definition: SubProcess.h:116
void spotlightWebPage(xgi::Input *, xgi::Output *)
unsigned int eols
Definition: queue_defs.h:59
#define Input(cl)
Definition: vmac.h:189
unsigned int nbp
Definition: queue_defs.h:61
xdata::Integer epmAltState_
Definition: FWEPWrapper.h:197
std::map< std::string, std::string, std::less< std::string > > & getEnvironment()
boost::tokenizer< boost::char_separator< char > > tokenizer
unsigned int ls
Definition: queue_defs.h:58
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:7
void sleep(Duration_t)
Definition: Utils.h:163
xdata::Boolean exitOnError_
U second(std::pair< T, U > const &p)
#define MSQM_MESSAGE_TYPE_PRG
Definition: queue_defs.h:19
#define MSQS_MESSAGE_TYPE_WEB
Definition: queue_defs.h:33
evf::StateMachine fsm_
int cycle
unsigned int lsid_
Definition: FWEPWrapper.h:210
bool monitoring(toolbox::task::WorkLoop *wl)
Definition: FWEPWrapper.cc:647
int postSlave(MsgBuf &ptr, bool isMonitor)
Definition: SubProcess.h:110
unsigned int psid_
Definition: FWEPWrapper.h:211
unsigned int Ms
Definition: queue_defs.h:63
#define NUMERIC_MESSAGE_SIZE
Definition: queue_defs.h:38
bool first
Definition: L1TdeRCT.cc:94
int totalEvents() const
void procStat(xgi::Input *in, xgi::Output *out)
unsigned int nba
Definition: queue_defs.h:62
tuple out
Definition: dbtoconf.py:99
#define MSQM_MESSAGE_TYPE_TRP
Definition: queue_defs.h:21
#define MSQS_MESSAGE_TYPE_MCR
Definition: queue_defs.h:30
xdata::InfoSpace * applicationInfoSpace_
xdata::String * stateName()
Definition: StateMachine.h:69
unsigned int scalersUpdates_
xdata::UnsignedInteger32 lastLumiUsingEol_
Definition: FWEPWrapper.h:222
#define PIPE_WRITE
Definition: queue_defs.h:41
void moduleWeb(xgi::Input *in, xgi::Output *out)
dictionary args
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:82
#define Output(cl)
Definition: vmac.h:193
void defaultWebPage(xgi::Input *in, xgi::Output *out)
unsigned int trp
Definition: queue_defs.h:66
tuple query
Definition: o2o.py:269
xdata::Integer epMAltState_
Definition: FWEPWrapper.h:196
tuple cout
Definition: gather_cfg.py:121
#define MSQM_MESSAGE_TYPE_MCS
Definition: queue_defs.h:18
#define MSQM_MESSAGE_TYPE_WEB
Definition: queue_defs.h:20
void microState(xgi::Input *in, xgi::Output *out)
#define MAX_PIPE_BUFFER_SIZE
Definition: queue_defs.h:42
int totalEventsPassed() const
pthread_mutex_t stop_lock_
unsigned int dqm
Definition: queue_defs.h:65
unsigned int ms
Definition: queue_defs.h:64
bool FUEventProcessor::restartForkInEDM ( unsigned int  slotId)
private

Definition at line 2300 of file FUEventProcessor.cc.

References evf::moduleweb::ForkInfoObj::control_sem_, prof2calltree::count, forkInfoObj_, evf::moduleweb::ForkInfoObj::forkParams, evf::moduleweb::ForkParams::isMaster, evf::moduleweb::ForkInfoObj::lock(), log_, evf::moduleweb::ForkParams::restart, restart_in_progress_, evf::moduleweb::ForkParams::slotId, evf::moduleweb::ForkInfoObj::stopCondition, and evf::moduleweb::ForkInfoObj::unlock().

Referenced by supervisor().

2300  {
2301  //daqsource will keep this lock until master returns after fork
2302  //so that we don't do another EP restart in between
2303  forkInfoObj_->lock();
2304  forkInfoObj_->forkParams.slotId=slotId;
2308  LOG4CPLUS_DEBUG(log_, " restarting subprocess in slot "<< slotId <<": posting on control semaphore");
2309  sem_post(forkInfoObj_->control_sem_);
2310  forkInfoObj_->unlock();
2311  usleep(1000);
2312  //sleep until fork is performed
2313  int count=50;
2314  restart_in_progress_=true;
2315  while (restart_in_progress_ && count--) usleep(20000);
2316  return true;
2317 }
unsigned int stopCondition
Definition: ModuleWeb.h:46
moduleweb::ForkInfoObj * forkInfoObj_
bool FUEventProcessor::scalers ( toolbox::task::WorkLoop *  wl)
private

Definition at line 1601 of file FUEventProcessor.cc.

References gather_cfg::cout, evtProcessor_, evf::FWEPWrapper::fireScalersUpdate(), evf::FWEPWrapper::getPackedTriggerReport(), evf::FWEPWrapper::getTriggerReport(), myProcess_, evf::SubProcess::postSlave(), run_regression::ret, scalersUpdates_, and wlScalersActive_.

Referenced by startScalersWorkLoop().

1602 {
1603  if(evtProcessor_)
1604  {
1605  if(!evtProcessor_.getTriggerReport(true)) {
1606  wlScalersActive_ = false;
1607  return false;
1608  }
1609  }
1610  else
1611  {
1612  std::cout << getpid()<< " Scalers workloop, bailing out, no evtProcessor " << std::endl;
1613  wlScalersActive_ = false;
1614  return false;
1615  }
1616  if(myProcess_)
1617  {
1618  // std::cout << getpid() << "going to post on control queue from scalers" << std::endl;
1620  if(ret!=0) std::cout << "scalers workloop, error posting to sqs_ " << errno << std::endl;
1621  scalersUpdates_++;
1622  }
1623  else
1625  return true;
1626 }
MsgBuf & getPackedTriggerReport()
Definition: FWEPWrapper.h:114
bool fireScalersUpdate()
Definition: FWEPWrapper.cc:777
int postSlave(MsgBuf &ptr, bool isMonitor)
Definition: SubProcess.h:110
bool getTriggerReport(bool useLock)
Definition: FWEPWrapper.cc:694
unsigned int scalersUpdates_
tuple cout
Definition: gather_cfg.py:121
void FUEventProcessor::scalersWeb ( xgi::Input in,
xgi::Output out 
)
throw (xgi::exception::Exception
)

Definition at line 1022 of file FUEventProcessor.cc.

References evtProcessor_, evf::FWEPWrapper::getPackedTriggerReportAsStruct(), and dbtoconf::out.

Referenced by FUEventProcessor().

1024 {
1025 
1026  out->getHTTPResponseHeader().addHeader( "Content-Type",
1027  "application/octet-stream" );
1028  out->getHTTPResponseHeader().addHeader( "Content-Transfer-Encoding",
1029  "binary" );
1030  if(evtProcessor_ != 0){
1032  }
1033 }
tuple out
Definition: dbtoconf.py:99
TriggerReportStatic * getPackedTriggerReportAsStruct()
Definition: FWEPWrapper.h:115
void FUEventProcessor::sendMessageOverMonitorQueue ( MsgBuf buf)

Definition at line 2669 of file FUEventProcessor.cc.

References myProcess_, and evf::SubProcess::postSlave().

2670 {
2671  if(myProcess_) myProcess_->postSlave(buf,true);
2672 }
int postSlave(MsgBuf &ptr, bool isMonitor)
Definition: SubProcess.h:110
void evf::FUEventProcessor::serviceWeb ( xgi::Input in,
xgi::Output out 
)
inline

Definition at line 115 of file FUEventProcessor.h.

References evtProcessor_, and evf::FWEPWrapper::serviceWeb().

Referenced by FUEventProcessor().

void serviceWeb(xgi::Input *in, xgi::Output *out)
tuple out
Definition: dbtoconf.py:99
void FUEventProcessor::setAttachDqmToShm ( )
throw (evf::Exception
)
private

Definition at line 1049 of file FUEventProcessor.cc.

References alignCSCRings::e, evtProcessor_, edm::EventProcessor::getToken(), edm::Service< T >::isAvailable(), cmsPerfStripChart::operate(), and cms::Exception::what().

1050 {
1051  std::string errmsg;
1052  try {
1055  edm::Service<FUShmDQMOutputService>()->setAttachToShm();
1056  }
1057  catch (cms::Exception& e) {
1058  errmsg = "Failed to set to attach DQM service to shared memory: " + (std::string)e.what();
1059  }
1060  if (!errmsg.empty()) XCEPT_RAISE(evf::Exception,errmsg);
1061 }
virtual char const * what() const
Definition: Exception.cc:141
bool isAvailable() const
Definition: Service.h:47
ServiceToken getToken()
bool FUEventProcessor::sigmon ( toolbox::task::WorkLoop *  wl)
private

Definition at line 1905 of file FUEventProcessor.cc.

References gather_cfg::cout, crashesThisRun_, crashesToDump_, enabling(), fsm_, i, lastCrashTime_, min, MSQM_MESSAGE_TYPE_RLI, NUMERIC_MESSAGE_SIZE, rlimit_coresize_changed_, sigmon_sem_, signalMonitorActive_, evf::StateMachine::stateName(), stopping(), and subs_.

Referenced by startSignalMonitorWorkLoop().

1906 {
1907  while (1) {
1908  sem_wait(sigmon_sem_);
1909  std::cout << " received signal notification from slave!"<< std::endl;
1910 
1911  //check if shutdown time
1912  bool running = fsm_.stateName()->toString()=="Enabled";
1913  bool stopping = fsm_.stateName()->toString()=="stopping";
1914  bool enabling = fsm_.stateName()->toString()=="enabling";
1915  if (!running && !enabling) {
1916  signalMonitorActive_ = false;
1917  return false;
1918  }
1919 
1920  crashesThisRun_++;
1921  gettimeofday(&lastCrashTime_,0);
1922 
1923  //set core size limit to 0 in master and slaves
1924  if (crashesThisRun_>=crashesToDump_.value_ && (running || stopping) && !rlimit_coresize_changed_) {
1925 
1926  rlimit rlold;
1927  getrlimit(RLIMIT_CORE,&rlold);
1928  rlimit rlnew = rlold;
1929  rlnew.rlim_cur=0;
1930  setrlimit(RLIMIT_CORE,&rlnew);
1932  MsgBuf master_message_rli_(NUMERIC_MESSAGE_SIZE,MSQM_MESSAGE_TYPE_RLI);
1933  //in case of frequent crashes, allow first slot to dump (until restart)
1934  unsigned int min=1;
1935  for (unsigned int i = min; i < subs_.size(); i++) {
1936  try {
1937  if (subs_[i].alive()) {
1938  subs_[i].post(master_message_rli_,false);
1939  }
1940  }
1941  catch (...) {}
1942  }
1943  std::ostringstream ostr;
1944  ostr << "Number of recent slave crashes reaches " << crashesThisRun_
1945  << ". Disabling core dumps for next 15 minutes in this FilterUnit";
1946  LOG4CPLUS_WARN(getApplicationLogger(),ostr.str());
1947  }
1948  }//end while loop
1949  signalMonitorActive_ = false;
1950  return false;
1951 }
int i
Definition: DBlmapReader.cc:9
xdata::UnsignedInteger32 crashesToDump_
#define min(a, b)
Definition: mlp_lapack.h:161
unsigned int crashesThisRun_
std::vector< SubProcess > subs_
evf::StateMachine fsm_
#define MSQM_MESSAGE_TYPE_RLI
Definition: queue_defs.h:24
#define NUMERIC_MESSAGE_SIZE
Definition: queue_defs.h:38
bool stopping(toolbox::task::WorkLoop *wl)
bool enabling(toolbox::task::WorkLoop *wl)
xdata::String * stateName()
Definition: StateMachine.h:69
tuple cout
Definition: gather_cfg.py:121
void FUEventProcessor::spotlightWebPage ( xgi::Input in,
xgi::Output out 
)
throw (xgi::exception::Exception
)

Definition at line 937 of file FUEventProcessor.cc.

References configuration_, evtProcessor_, fsm_, recoMuon::in, myProcess_, nbSubProcesses_, dbtoconf::out, evf::StateMachine::stateName(), evf::FWEPWrapper::summaryWebPage(), and evf::FWEPWrapper::taskWebPage().

Referenced by FUEventProcessor(), and receivingAndMonitor().

939 {
940 
941  std::string urn = getApplicationDescriptor()->getURN();
942 
943  *out << "<!-- base href=\"/" << urn
944  << "\"> -->" << std::endl;
945  *out << "<html>" << std::endl;
946  *out << "<head>" << std::endl;
947  *out << "<link type=\"text/css\" rel=\"stylesheet\"";
948  *out << " href=\"/evf/html/styles.css\"/>" << std::endl;
949  *out << "<title>" << getApplicationDescriptor()->getClassName()
950  << getApplicationDescriptor()->getInstance()
951  << " MAIN</title>" << std::endl;
952  *out << "</head>" << std::endl;
953  *out << "<body>" << std::endl;
954  *out << "<table border=\"0\" width=\"100%\">" << std::endl;
955  *out << "<tr>" << std::endl;
956  *out << " <td align=\"left\">" << std::endl;
957  *out << " <img" << std::endl;
958  *out << " align=\"middle\"" << std::endl;
959  *out << " src=\"/evf/images/spoticon.jpg\"" << std::endl;
960  *out << " alt=\"main\"" << std::endl;
961  *out << " width=\"64\"" << std::endl;
962  *out << " height=\"64\"" << std::endl;
963  *out << " border=\"\"/>" << std::endl;
964  *out << " <b>" << std::endl;
965  *out << getApplicationDescriptor()->getClassName()
966  << getApplicationDescriptor()->getInstance() << std::endl;
967  *out << " " << fsm_.stateName()->toString() << std::endl;
968  *out << " </b>" << std::endl;
969  *out << " </td>" << std::endl;
970  *out << " <td width=\"32\">" << std::endl;
971  *out << " <a href=\"/urn:xdaq-application:lid=3\">" << std::endl;
972  *out << " <img" << std::endl;
973  *out << " align=\"middle\"" << std::endl;
974  *out << " src=\"/hyperdaq/images/HyperDAQ.jpg\"" << std::endl;
975  *out << " alt=\"HyperDAQ\"" << std::endl;
976  *out << " width=\"32\"" << std::endl;
977  *out << " height=\"32\"" << std::endl;
978  *out << " border=\"\"/>" << std::endl;
979  *out << " </a>" << std::endl;
980  *out << " </td>" << std::endl;
981  *out << " <td width=\"32\">" << std::endl;
982  *out << " </td>" << std::endl;
983  *out << " <td width=\"32\">" << std::endl;
984  *out << " <a href=\"/" << urn << "/\">" << std::endl;
985  *out << " <img" << std::endl;
986  *out << " align=\"middle\"" << std::endl;
987  *out << " src=\"/evf/images/epicon.jpg\"" << std::endl;
988  *out << " alt=\"main\"" << std::endl;
989  *out << " width=\"32\"" << std::endl;
990  *out << " height=\"32\"" << std::endl;
991  *out << " border=\"\"/>" << std::endl;
992  *out << " </a>" << std::endl;
993  *out << " </td>" << std::endl;
994  *out << "</tr>" << std::endl;
995  *out << "</table>" << std::endl;
996 
997  *out << "<hr/>" << std::endl;
998 
999  std::ostringstream ost;
1000  if(myProcess_)
1001  ost << "/SubWeb?process=" << getpid() << "&method=moduleWeb&";
1002  else
1003  ost << "/moduleWeb?";
1004  urn += ost.str();
1005  if(evtProcessor_ && (myProcess_ || nbSubProcesses_.value_==0))
1007  else if(evtProcessor_)
1009  else
1010  *out << "<td>HLT Unconfigured</td>" << std::endl;
1011  *out << "</table>" << std::endl;
1012 
1013  *out << "<br><textarea rows=" << 10 << " cols=80 scroll=yes>" << std::endl;
1014  *out << configuration_ << std::endl;
1015  *out << "</textarea><P>" << std::endl;
1016 
1017  *out << "</body>" << std::endl;
1018  *out << "</html>" << std::endl;
1019 
1020 
1021 }
void summaryWebPage(xgi::Input *, xgi::Output *, const std::string &)
Definition: FWEPWrapper.cc:815
void taskWebPage(xgi::Input *, xgi::Output *, const std::string &)
Definition: FWEPWrapper.cc:881
evf::StateMachine fsm_
tuple out
Definition: dbtoconf.py:99
xdata::UnsignedInteger32 nbSubProcesses_
xdata::String * stateName()
Definition: StateMachine.h:69
void FUEventProcessor::startReceivingLoop ( )
private

Definition at line 1151 of file FUEventProcessor.cc.

References asReceiveMsgAndExecute_, alignCSCRings::e, edm::hlt::Exception, lumiQueryAPI::msg, receiving(), receiving_, and wlReceiving_.

Referenced by enableMPEPSlave(), and forkProcessesFromEDM().

1152 {
1153  try {
1154  wlReceiving_=
1155  toolbox::task::getWorkLoopFactory()->getWorkLoop("Receiving",
1156  "waiting");
1157  if (!wlReceiving_->isActive()) wlReceiving_->activate();
1158  asReceiveMsgAndExecute_ = toolbox::task::bind(this,&FUEventProcessor::receiving,
1159  "Receiving");
1161  receiving_ = true;
1162  }
1163  catch (xcept::Exception& e) {
1164  std::string msg = "Failed to start workloop 'Receiving'.";
1165  XCEPT_RETHROW(evf::Exception,msg,e);
1166  }
1167 }
toolbox::task::WorkLoop * wlReceiving_
toolbox::task::ActionSignature * asReceiveMsgAndExecute_
bool receiving(toolbox::task::WorkLoop *wl)
void FUEventProcessor::startReceivingMonitorLoop ( )
private

Definition at line 1168 of file FUEventProcessor.cc.

References asReceiveMsgAndRead_, alignCSCRings::e, edm::hlt::Exception, lumiQueryAPI::msg, receivingAndMonitor(), receivingM_, and wlReceivingMonitor_.

Referenced by enableMPEPSlave(), and forkProcessesFromEDM().

1169 {
1170  try {
1172  toolbox::task::getWorkLoopFactory()->getWorkLoop("ReceivingM",
1173  "waiting");
1174  if (!wlReceivingMonitor_->isActive())
1175  wlReceivingMonitor_->activate();
1177  toolbox::task::bind(this,&FUEventProcessor::receivingAndMonitor,
1178  "ReceivingM");
1180  receivingM_ = true;
1181  }
1182  catch (xcept::Exception& e) {
1183  std::string msg = "Failed to start workloop 'ReceivingM'.";
1184  XCEPT_RETHROW(evf::Exception,msg,e);
1185  }
1186 }
bool receivingAndMonitor(toolbox::task::WorkLoop *wl)
toolbox::task::ActionSignature * asReceiveMsgAndRead_
toolbox::task::WorkLoop * wlReceivingMonitor_
void FUEventProcessor::startScalersWorkLoop ( )
throw (evf::Exception
)
private

Definition at line 1558 of file FUEventProcessor.cc.

References asScalers_, alignCSCRings::e, edm::hlt::Exception, lumiQueryAPI::msg, scalers(), wlScalers_, and wlScalersActive_.

Referenced by enableMPEPSlave(), and forkProcessesFromEDM().

1559 {
1560  try {
1561  wlScalers_=
1562  toolbox::task::getWorkLoopFactory()->getWorkLoop("Scalers",
1563  "waiting");
1564  if (!wlScalers_->isActive()) wlScalers_->activate();
1565  asScalers_ = toolbox::task::bind(this,&FUEventProcessor::scalers,
1566  "Scalers");
1567 
1568  wlScalers_->submit(asScalers_);
1569  wlScalersActive_ = true;
1570  }
1571  catch (xcept::Exception& e) {
1572  std::string msg = "Failed to start workloop 'Scalers'.";
1573  XCEPT_RETHROW(evf::Exception,msg,e);
1574  }
1575 }
toolbox::task::WorkLoop * wlScalers_
bool scalers(toolbox::task::WorkLoop *wl)
toolbox::task::ActionSignature * asScalers_
void FUEventProcessor::startSignalMonitorWorkLoop ( )
throw (evf::Exception
)
private

Definition at line 1882 of file FUEventProcessor.cc.

References asSignalMonitor_, gather_cfg::cout, alignCSCRings::e, edm::hlt::Exception, lumiQueryAPI::msg, sigmon(), signalMonitorActive_, and wlSignalMonitor_.

Referenced by enabling().

1883 {
1884  //todo rewind/check semaphore
1885  //start workloop
1886  try {
1888  toolbox::task::getWorkLoopFactory()->getWorkLoop("SignalMonitor",
1889  "waiting");
1890 
1891  if (!wlSignalMonitor_->isActive()) wlSignalMonitor_->activate();
1892  asSignalMonitor_ = toolbox::task::bind(this,&FUEventProcessor::sigmon,
1893  "SignalMonitor");
1895  signalMonitorActive_ = true;
1896  }
1897  catch (xcept::Exception& e) {
1898  std::string msg = "Failed to start workloop 'SignalMonitor'. (3)";
1899  std::cout << e.what() << std::endl;
1900  XCEPT_RETHROW(evf::Exception,msg,e);
1901  }
1902 }
toolbox::task::ActionSignature * asSignalMonitor_
bool sigmon(toolbox::task::WorkLoop *wl)
toolbox::task::WorkLoop * wlSignalMonitor_
tuple cout
Definition: gather_cfg.py:121
void FUEventProcessor::startSummarizeWorkLoop ( )
throw (evf::Exception
)
private

Definition at line 1579 of file FUEventProcessor.cc.

References asSummarize_, alignCSCRings::e, edm::hlt::Exception, lumiQueryAPI::msg, summarize(), wlSummarize_, and wlSummarizeActive_.

Referenced by enabling().

1580 {
1581  try {
1582  wlSummarize_=
1583  toolbox::task::getWorkLoopFactory()->getWorkLoop("Summary",
1584  "waiting");
1585  if (!wlSummarize_->isActive()) wlSummarize_->activate();
1586 
1587  asSummarize_ = toolbox::task::bind(this,&FUEventProcessor::summarize,
1588  "Summary");
1589 
1590  wlSummarize_->submit(asSummarize_);
1591  wlSummarizeActive_ = true;
1592  }
1593  catch (xcept::Exception& e) {
1594  std::string msg = "Failed to start workloop 'Summarize'.";
1595  XCEPT_RETHROW(evf::Exception,msg,e);
1596  }
1597 }
bool summarize(toolbox::task::WorkLoop *wl)
toolbox::task::WorkLoop * wlSummarize_
toolbox::task::ActionSignature * asSummarize_
void FUEventProcessor::startSupervisorLoop ( )
private

Definition at line 1133 of file FUEventProcessor.cc.

References asSupervisor_, alignCSCRings::e, edm::hlt::Exception, lumiQueryAPI::msg, supervising_, supervisor(), and wlSupervising_.

Referenced by FUEventProcessor().

1134 {
1135  try {
1137  toolbox::task::getWorkLoopFactory()->getWorkLoop("Supervisor",
1138  "waiting");
1139  if (!wlSupervising_->isActive()) wlSupervising_->activate();
1140  asSupervisor_ = toolbox::task::bind(this,&FUEventProcessor::supervisor,
1141  "Supervisor");
1142  wlSupervising_->submit(asSupervisor_);
1143  supervising_ = true;
1144  }
1145  catch (xcept::Exception& e) {
1146  std::string msg = "Failed to start workloop 'Supervisor'.";
1147  XCEPT_RETHROW(evf::Exception,msg,e);
1148  }
1149 }
toolbox::task::ActionSignature * asSupervisor_
toolbox::task::WorkLoop * wlSupervising_
bool supervisor(toolbox::task::WorkLoop *wl)
bool FUEventProcessor::stopClassic ( )
private

Definition at line 2375 of file FUEventProcessor.cc.

References detachDqmFromShm(), alignCSCRings::e, edm::IEventProcessor::epSuccess, edm::IEventProcessor::epTimedOut, evtProcessor_, edm::hlt::Exception, evf::StateMachine::fireEvent(), evf::StateMachine::fireFailed(), fsm_, hasShMem_, localLog(), reasonForFailedState_, evf::FWEPWrapper::stop(), and cms::Exception::what().

Referenced by receiving(), and stopping().

2376 {
2377  bool failed=false;
2378  try {
2379  LOG4CPLUS_INFO(getApplicationLogger(),"Start stopping :) ...");
2382  fsm_.fireEvent("StopDone",this);
2383  else
2384  {
2385  failed=true;
2386  // epMState_ = evtProcessor_->currentStateName();
2388  reasonForFailedState_ = "EventProcessor stop timed out";
2389  else
2390  reasonForFailedState_ = "EventProcessor did not receive STOP event";
2391  }
2392  }
2393  catch (xcept::Exception &e) {
2394  failed=true;
2395  reasonForFailedState_ = "Stopping FAILED: " + (std::string)e.what();
2396  }
2397  catch (edm::Exception &e) {
2398  failed=true;
2399  reasonForFailedState_ = "Stopping FAILED: " + (std::string)e.what();
2400  }
2401  catch (...) {
2402  failed=true;
2403  reasonForFailedState_= "Stopping FAILED: unknown exception";
2404  }
2405  try {
2406  if (hasShMem_) {
2407  detachDqmFromShm();
2408  if (failed)
2409  LOG4CPLUS_WARN(getApplicationLogger(),
2410  "In failed STOP - success detaching DQM from Shm. pid:" << getpid());
2411  }
2412  }
2413  catch (cms::Exception & e) {
2414  failed=true;
2415  reasonForFailedState_= "Stopping FAILED: " + (std::string)e.what();
2416  }
2417  catch (...) {
2418  failed=true;
2419  reasonForFailedState_= "DQM detach failed: Unknown exception";
2420  }
2421 
2422  if (failed) {
2423  LOG4CPLUS_FATAL(getApplicationLogger(),"STOP failed: "
2424  << reasonForFailedState_ << " (pid:" << getpid()<<")");
2427  }
2428 
2429  LOG4CPLUS_INFO(getApplicationLogger(),"Finished stopping!");
2430  localLog("-I- Stop completed");
2431  return false;
2432 }
virtual char const * what() const
Definition: Exception.cc:141
xdata::Boolean hasShMem_
void fireFailed(const std::string &errorMsg, void *originator)
evf::StateMachine fsm_
std::string reasonForFailedState_
void fireEvent(const std::string &evtType, void *originator)
void localLog(std::string)
edm::EventProcessor::StatusCode stop()
Definition: FWEPWrapper.cc:503
bool FUEventProcessor::stopping ( toolbox::task::WorkLoop *  wl)

Definition at line 737 of file FUEventProcessor.cc.

References doEndRunInEDM(), forkInEDM_, hasShMem_, nbSubProcesses_, rlimit_coresize_default_, sigmon_sem_, evf::Vulture::stop(), stopClassic(), stopSlavesAndAcknowledge(), and vulture_.

Referenced by sigmon(), and supervisor().

738 {
739  setrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
740  if(nbSubProcesses_.value_!=0) {
742  if (forkInEDM_.value_) {
743  //only in new forking for now
744  sem_post(sigmon_sem_);
745  if (!doEndRunInEDM())
746  return false;
747  }
748  }
749  vulture_->stop();
750 
751  if (forkInEDM_.value_) {
752  //shared memory was already disconnected in master
753  bool tmpHasShMem_=hasShMem_;
754  hasShMem_=false;
755  stopClassic();
756  hasShMem_=tmpHasShMem_;
757  return false;
758  }
759  stopClassic();
760  return false;
761 }
int stop()
Definition: Vulture.cc:254
xdata::UnsignedInteger32 forkInEDM_
xdata::Boolean hasShMem_
xdata::UnsignedInteger32 nbSubProcesses_
void FUEventProcessor::stopSlavesAndAcknowledge ( )
private

Definition at line 2434 of file FUEventProcessor.cc.

References alignCSCRings::e, i, localLog(), MAX_MSG_SIZE, lumiQueryAPI::msg, MSQM_MESSAGE_TYPE_STOP, MSQS_MESSAGE_TYPE_STOP, nbSubProcesses_, reasonForFailedState_, stop_lock_, and subs_.

Referenced by halting(), and stopping().

2435 {
2438 
2439  std::vector<bool> processes_to_stop(nbSubProcesses_.value_,false);
2440  for(unsigned int i = 0; i < subs_.size(); i++)
2441  {
2442  pthread_mutex_lock(&stop_lock_);
2443  if(subs_[i].alive()>0){
2444  processes_to_stop[i] = true;
2445  subs_[i].post(msg,false);
2446  }
2447  pthread_mutex_unlock(&stop_lock_);
2448  }
2449  for(unsigned int i = 0; i < subs_.size(); i++)
2450  {
2451  pthread_mutex_lock(&stop_lock_);
2452  if(processes_to_stop[i]){
2453  try{
2454  subs_[i].rcv(msg1,false);
2455  }
2456  catch(evf::Exception &e){
2457  std::ostringstream ost;
2458  ost << "failed to get STOP - errno ->" << errno << " " << e.what();
2459  reasonForFailedState_ = ost.str();
2460  LOG4CPLUS_ERROR(getApplicationLogger(),reasonForFailedState_);
2461  // fsm_.fireFailed(reasonForFailedState_,this);
2463  pthread_mutex_unlock(&stop_lock_);
2464  continue;
2465  }
2466  }
2467  else {
2468  pthread_mutex_unlock(&stop_lock_);
2469  continue;
2470  }
2471  pthread_mutex_unlock(&stop_lock_);
2472  if(msg1->mtype==MSQS_MESSAGE_TYPE_STOP)
2473  while(subs_[i].alive()>0) ::usleep(10000);
2474  subs_[i].disconnect();
2475  }
2476  // subs_.clear();
2477 
2478 }
int i
Definition: DBlmapReader.cc:9
#define MAX_MSG_SIZE
Definition: queue_defs.h:10
std::vector< SubProcess > subs_
#define MSQS_MESSAGE_TYPE_STOP
Definition: queue_defs.h:31
std::string reasonForFailedState_
void localLog(std::string)
xdata::UnsignedInteger32 nbSubProcesses_
#define MSQM_MESSAGE_TYPE_STOP
Definition: queue_defs.h:16
pthread_mutex_t stop_lock_
void FUEventProcessor::subWeb ( xgi::Input in,
xgi::Output out 
)

Definition at line 853 of file FUEventProcessor.cc.

References anonymousPipe_, gather_cfg::cout, run_regression::done, asciidump::els, i, j, Association::map, MAX_MSG_SIZE, MAX_PIPE_BUFFER_SIZE, text2workspace::mod, lumiQueryAPI::msg, MSGQ_MESSAGE_TYPE_RANGE, MSQM_MESSAGE_TYPE_WEB, MSQS_MESSAGE_TYPE_WEB, evf::utils::pid, csv2json::pieces, PIPE_READ, SiPixelLorentzAngle_cfi::read, stor::utils::sleep(), subs_, and superSleepSec_.

Referenced by FUEventProcessor().

854 {
855  using namespace cgicc;
856  pid_t pid = 0;
857  std::ostringstream ost;
858  ost << "&";
859 
860  Cgicc cgi(in);
862  for(std::map<std::string, std::string, std::less<std::string> >::iterator mit =
863  mycgi->getEnvironment().begin();
864  mit != mycgi->getEnvironment().end(); mit++)
865  ost << mit->first << "%" << mit->second << ";";
866  std::vector<FormEntry> els = cgi.getElements() ;
867  std::vector<FormEntry> el1;
868  cgi.getElement("method",el1);
869  std::vector<FormEntry> el2;
870  cgi.getElement("process",el2);
871  if(el1.size()!=0) {
872  std::string meth = el1[0].getValue();
873  if(el2.size()!=0) {
874  unsigned int i = 0;
875  std::string mod = el2[0].getValue();
876  pid = atoi(mod.c_str()); // get the process id to be polled
877  for(; i < subs_.size(); i++)
878  if(subs_[i].pid()==pid) break;
879  if(i>=subs_.size()){ //process was not found, let the browser know
880  *out << "ERROR 404 : Process " << pid << " Not Found !" << std::endl;
881  return;
882  }
883  if(subs_[i].alive() != 1){
884  *out << "ERROR 405 : Process " << pid << " Not Alive !" << std::endl;
885  return;
886  }
887  MsgBuf msg1(meth.length()+ost.str().length()+1,MSQM_MESSAGE_TYPE_WEB);
888  strncpy(msg1->mtext,meth.c_str(),meth.length());
889  strncpy(msg1->mtext+meth.length(),ost.str().c_str(),ost.str().length());
890  subs_[i].post(msg1,true);
891  unsigned int keep_supersleep_original_value = superSleepSec_.value_;
892  superSleepSec_.value_=10*keep_supersleep_original_value;
894  bool done = false;
895  std::vector<char *>pieces;
896  while(!done){
897  unsigned long retval1 = subs_[i].rcvNonBlocking(msg,true);
898  if(retval1 == MSGQ_MESSAGE_TYPE_RANGE){
899  ::sleep(1);
900  continue;
901  }
902  unsigned int nbytes = atoi(msg->mtext);
903  if(nbytes < MAX_PIPE_BUFFER_SIZE) done = true; // this will break the while loop
904  char *buf= new char[nbytes];
905  ssize_t retval = read(anonymousPipe_[PIPE_READ],buf,nbytes);
906  if(retval!=nbytes) std::cout
907  << "CAREFUL HERE, read less bytes than expected from pipe in subWeb" << std::endl;
908  pieces.push_back(buf);
909  }
910  superSleepSec_.value_=keep_supersleep_original_value;
911  for(unsigned int j = 0; j < pieces.size(); j++){
912  *out<<pieces[j]; // chain the buffers into the output strstream
913  delete[] pieces[j]; //make sure to release all buffers used for reading the pipe
914  }
915  }
916  }
917 }
int i
Definition: DBlmapReader.cc:9
#define MSGQ_MESSAGE_TYPE_RANGE
Definition: queue_defs.h:13
tuple pieces
Definition: csv2json.py:31
tuple els
Definition: asciidump.py:420
#define MAX_MSG_SIZE
Definition: queue_defs.h:10
void sleep(Duration_t)
Definition: Utils.h:163
dictionary map
Definition: Association.py:205
std::vector< SubProcess > subs_
#define MSQS_MESSAGE_TYPE_WEB
Definition: queue_defs.h:33
#define PIPE_READ
Definition: queue_defs.h:40
int j
Definition: DBlmapReader.cc:9
list mod
Load physics model.
tuple out
Definition: dbtoconf.py:99
xdata::UnsignedInteger32 superSleepSec_
tuple cout
Definition: gather_cfg.py:121
#define MSQM_MESSAGE_TYPE_WEB
Definition: queue_defs.h:20
#define MAX_PIPE_BUFFER_SIZE
Definition: queue_defs.h:42
bool FUEventProcessor::summarize ( toolbox::task::WorkLoop *  wl)
private

Definition at line 1629 of file FUEventProcessor.cc.

References allProcStats_, gather_cfg::cout, cpustat_, alignCSCRings::e, evf::TriggerReportStatic::eventSummary, evtProcessor_, evf::FWEPWrapper::fireScalersUpdate(), fsm_, evf::FWEPWrapper::getLumiSectionReferenceIndex(), evf::FWEPWrapper::getPackedTriggerReportAsStruct(), i, iDieStatisticsGathering_, idleProcStats_, lastProcReport_, evf::TriggerReportStatic::lumiSection, master_message_trr_, MSQS_MESSAGE_TYPE_TRR, evf::TriggerReportStatic::nbExpected, evf::TriggerReportStatic::nbReporting, nbSubProcesses_, nbSubProcessesReporting_, evf::utils::procCpuStat(), ratestat_, evf::CPUStat::reset(), evf::FWEPWrapper::resetPackedTriggerReport(), run_regression::ret, evf::RateStat::sendStat(), evf::CPUStat::sendStat(), evf::CPUStat::setCPUStat(), evf::CPUStat::setElapsed(), evf::CPUStat::setNproc(), stor::utils::sleep(), evf::StateMachine::stateName(), subs_, evf::FWEPWrapper::sumAndPackTriggerReport(), edm::EventSummary::totalEvents, evf::FWEPWrapper::updateRollingReport(), evf::FWEPWrapper::withdrawLumiSectionIncrement(), and wlScalersActive_.

Referenced by startSummarizeWorkLoop().

1630 {
1632  bool atLeastOneProcessUpdatedSuccessfully = false;
1633  int msgCount = 0;
1634  for (unsigned int i = 0; i < subs_.size(); i++)
1635  {
1636  if(subs_[i].alive()>0)
1637  {
1638  int ret = 0;
1639  if(subs_[i].check_postponed_trigger_update(master_message_trr_,
1641  {
1642  ret = MSQS_MESSAGE_TYPE_TRR;
1643  std::cout << "using postponed report from slot " << i << " for ls " << evtProcessor_.getLumiSectionReferenceIndex() << std::endl;
1644  }
1645  else{
1646  bool insync = false;
1647  bool exception_caught = false;
1648  while(!insync){
1649  try{
1650  ret = subs_[i].rcv(master_message_trr_,false);
1651  }
1652  catch(evf::Exception &e)
1653  {
1654  std::cout << "exception in msgrcv on " << i
1655  << " " << subs_[i].alive() << " " << strerror(errno) << std::endl;
1656  exception_caught = true;
1657  break;
1658  //do nothing special
1659  }
1660  if(ret==MSQS_MESSAGE_TYPE_TRR) {
1663  insync = true;
1664  }
1665  }
1666  }
1667  if(exception_caught) continue;
1668  }
1669  msgCount++;
1670  if(ret==MSQS_MESSAGE_TYPE_TRR) {
1673  std::cout << "postpone handling of msg from slot " << i << " with Ls " << trp->lumiSection
1674  << " should be " << evtProcessor_.getLumiSectionReferenceIndex() << std::endl;
1675  subs_[i].add_postponed_trigger_update(master_message_trr_);
1676  }else{
1677  atLeastOneProcessUpdatedSuccessfully = true;
1679  }
1680  }
1681  else std::cout << "msgrcv returned error " << errno << std::endl;
1682  }
1683  }
1684  if(atLeastOneProcessUpdatedSuccessfully){
1685  nbSubProcessesReporting_.value_ = msgCount;
1690  }
1691  else{
1692  LOG4CPLUS_WARN(getApplicationLogger(),"Summarize loop: no process updated successfully - sleep 10 seconds before trying again");
1693  if(msgCount==0) evtProcessor_.withdrawLumiSectionIncrement();
1694  nbSubProcessesReporting_.value_ = 0;
1695  ::sleep(10);
1696  }
1697  if(fsm_.stateName()->toString()!="Enabled"){
1698  wlScalersActive_ = false;
1699  return false;
1700  }
1701  // cpustat_->printStat();
1702  if(iDieStatisticsGathering_.value_){
1703  try{
1704  unsigned long long idleTmp=idleProcStats_;
1705  unsigned long long allPSTmp=allProcStats_;
1707 
1709  timeval oldtime=lastProcReport_;
1710  gettimeofday(&lastProcReport_,0);
1711 
1712  if (allPSTmp!=0 && idleTmp!=0 && allProcStats_!=allPSTmp) {
1713  cpustat_->setCPUStat(1000 - ((idleProcStats_-idleTmp)*1000)/(allProcStats_-allPSTmp));
1714  int deltaTms=1000 * (lastProcReport_.tv_sec-oldtime.tv_sec)
1715  + (lastProcReport_.tv_usec-oldtime.tv_usec)/1000;
1716  cpustat_->setElapsed(deltaTms);
1717  }
1718  else {
1719  cpustat_->setCPUStat(0);
1720  cpustat_->setElapsed(0);
1721  }
1722 
1726  ratestat_->sendStat((unsigned char*)trsp,
1727  sizeof(TriggerReportStatic),
1729  }catch(evf::Exception &e){
1730  LOG4CPLUS_INFO(getApplicationLogger(),"coud not send statistics"
1731  << e.what());
1732  }
1733  }
1734  cpustat_->reset();
1735  return true;
1736 }
int i
Definition: DBlmapReader.cc:9
void updateRollingReport()
#define MSQS_MESSAGE_TYPE_TRR
Definition: queue_defs.h:34
void setCPUStat(int busyPer1k)
Definition: CPUStat.h:25
xdata::UnsignedInteger32 nbSubProcessesReporting_
void sleep(Duration_t)
Definition: Utils.h:163
void setElapsed(int mseconds)
Definition: CPUStat.h:28
std::vector< SubProcess > subs_
bool fireScalersUpdate()
Definition: FWEPWrapper.cc:777
evf::StateMachine fsm_
unsigned long long idleProcStats_
xdata::Boolean iDieStatisticsGathering_
void setNproc(int nproc)
Definition: CPUStat.h:22
void resetPackedTriggerReport()
Definition: FWEPWrapper.h:111
void sendStat(unsigned int)
Definition: CPUStat.cc:33
void reset()
Definition: CPUStat.h:31
xdata::UnsignedInteger32 nbSubProcesses_
xdata::String * stateName()
Definition: StateMachine.h:69
unsigned int getLumiSectionReferenceIndex()
Definition: FWEPWrapper.h:139
unsigned long long allProcStats_
void procCpuStat(unsigned long long &idleJiffies, unsigned long long &allJiffies)
Definition: procUtils.cc:157
tuple cout
Definition: gather_cfg.py:121
edm::EventSummary eventSummary
void withdrawLumiSectionIncrement()
Definition: FWEPWrapper.h:138
void sumAndPackTriggerReport(MsgBuf &buf)
TriggerReportStatic * getPackedTriggerReportAsStruct()
Definition: FWEPWrapper.h:115
void sendStat(const unsigned char *, size_t, unsigned int)
Definition: RateStat.cc:19
bool FUEventProcessor::supervisor ( toolbox::task::WorkLoop *  wl)
private

Definition at line 1249 of file FUEventProcessor.cc.

References evf::CPUStat::addEntry(), evf::FWEPWrapper::adjustLsIndexForRestart(), applicationInfoSpace_, autoRestartSlaves_, gather_cfg::cout, cpustat_, crashesThisRun_, delta, evf::StateMachine::disableRcmsStateNotification(), evf::prg::dqm, alignCSCRings::e, edm_init_done_, enableMPEPSlave(), evtProcessor_, cppFunctionSkipper::exception, edm::hlt::Exception, spr::find(), evf::StateMachine::fireEvent(), forkInEDM_, fsm_, i, lastCrashTime_, localLog(), log_, evf::prg::ls, python.rootplot.utilities::ls(), master_message_prg_, master_message_prr_, evf::FWEPWrapper::moduleNameFromIndex(), monitorInfoSpace_, evf::prg::Ms, evf::prg::ms, MSQM_MESSAGE_TYPE_FSTOP, MSQM_MESSAGE_TYPE_RLR, myProcess_, names_, nbAccepted, nbdead_, nblive_, nbProcessed, nbSubProcesses_, nbTotalDQM_, evf::FWEPWrapper::notstarted_state_code(), NUMERIC_MESSAGE_SIZE, AlCaHLTBitMon_ParallelJobs::p, pickup_lock_, evf::prg::ps, evf::FWEPWrapper::resetPackedTriggerReport(), restartForkInEDM(), rlimit_coresize_changed_, rlimit_coresize_default_, findQualityFiles::rr, scalersUpdates_, edm::event_processor::sError, edm::event_processor::sInit, edm::event_processor::sInvalid, slaveRestartDelaySecs_, stor::utils::sleep(), spMStates_, spmStates_, edm::event_processor::sStopping, evf::StateMachine::stateName(), evf::FWEPWrapper::stateNameFromIndex(), stop_lock_, stopping(), subs_, superSleepSec_, and evf::prg::trp.

Referenced by startSupervisorLoop().

1250 {
1251  pthread_mutex_lock(&stop_lock_);
1252  if(subs_.size()!=nbSubProcesses_.value_)
1253  {
1254  pthread_mutex_lock(&pickup_lock_);
1255  if(subs_.size()!=nbSubProcesses_.value_) {
1256  subs_.resize(nbSubProcesses_.value_);
1257  spMStates_.resize(nbSubProcesses_.value_);
1258  spmStates_.resize(nbSubProcesses_.value_);
1259  for(unsigned int i = 0; i < spMStates_.size(); i++)
1260  {
1262  spmStates_[i] = 0;
1263  }
1264  }
1265  pthread_mutex_unlock(&pickup_lock_);
1266  }
1267  bool running = fsm_.stateName()->toString()=="Enabled";
1268  bool stopping = fsm_.stateName()->toString()=="stopping";
1269  for(unsigned int i = 0; i < subs_.size(); i++)
1270  {
1271  if(subs_[i].alive()==-1000) continue;
1272  int sl;
1273  pid_t sub_pid = subs_[i].pid();
1274  pid_t killedOrNot = waitpid(sub_pid,&sl,WNOHANG);
1275 
1276  if(killedOrNot && killedOrNot==sub_pid) {
1277  pthread_mutex_lock(&pickup_lock_);
1278  //check if out of range or recreated (enable can clear vector)
1279  if (i<subs_.size() && subs_[i].alive()!=-1000) {
1280  subs_[i].setStatus((WIFEXITED(sl) != 0 ? 0 : -1));
1281  std::ostringstream ost;
1282  if(subs_[i].alive()==0) ost << " process exited with status " << WEXITSTATUS(sl);
1283  else if(WIFSIGNALED(sl)!=0) {
1284  ost << " process terminated with signal " << WTERMSIG(sl);
1285  }
1286  else ost << " process stopped ";
1287  //report unexpected slave exit in stop
1288  //if (stopping && (WEXITSTATUS(sl)!=0 || WIFSIGNALED(sl)!=0)) {
1289  // LOG4CPLUS_WARN(getApplicationLogger(),ost.str() << ", slave pid:"<<getpid());
1290  //}
1291  subs_[i].countdown()=slaveRestartDelaySecs_.value_;
1292  subs_[i].setReasonForFailed(ost.str());
1294  spmStates_[i] = 0;
1295  std::ostringstream ost1;
1296  ost1 << "-E- Slave " << subs_[i].pid() << ost.str();
1297  localLog(ost1.str());
1298  if(!autoRestartSlaves_.value_) subs_[i].disconnect();
1299  }
1300  pthread_mutex_unlock(&pickup_lock_);
1301  }
1302  }
1303  pthread_mutex_unlock(&stop_lock_);
1304  if(stopping) return true; // if in stopping we are done
1305 
1306  // check if we need to reset core dumps (15 min after last one)
1307  if (running && rlimit_coresize_changed_) {
1308  timeval newtv;
1309  gettimeofday(&newtv,0);
1310  int delta = newtv.tv_sec-lastCrashTime_.tv_sec;
1311  if (delta>60*15) {
1312  std::ostringstream ostr;
1313  ostr << " No more slave EP crashes on this machine in last 15 min. resetting core size limits";
1314  std::cout << ostr.str() << std::endl;
1315  LOG4CPLUS_INFO(getApplicationLogger(),ostr.str());
1316  setrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
1317  MsgBuf master_message_rlr_(NUMERIC_MESSAGE_SIZE,MSQM_MESSAGE_TYPE_RLR);
1318  for (unsigned int i = 0; i < subs_.size(); i++) {
1319  try {
1320  if (subs_[i].alive())
1321  subs_[i].post(master_message_rlr_,false);
1322  }
1323  catch (...) {}
1324  }
1326  crashesThisRun_=0;
1327  }
1328  }
1329 
1330  if(running && edm_init_done_)
1331  {
1332  // if enabled, this loop will periodically check if dead slaves countdown has expired and restart them
1333  // this is only active while running, hence, the stop lock is acquired and only released at end of loop
1334  if(autoRestartSlaves_.value_){
1335  pthread_mutex_lock(&stop_lock_); //lockout slave killing at stop while you check for restarts
1336  for(unsigned int i = 0; i < subs_.size(); i++)
1337  {
1338  if(subs_[i].alive() != 1){
1339  if(subs_[i].countdown() == 0)
1340  {
1341  if(subs_[i].restartCount()>2){
1342  LOG4CPLUS_WARN(getApplicationLogger()," Not restarting subprocess in slot " << i
1343  << " - maximum restart count reached");
1344  std::ostringstream ost1;
1345  ost1 << "-W- Dead Process in slot " << i << " reached maximum restart count";
1346  localLog(ost1.str());
1347  subs_[i].countdown()--;
1348  XCEPT_DECLARE(evf::Exception,
1349  sentinelException, ost1.str());
1350  notifyQualified("error",sentinelException);
1351  subs_[i].disconnect();
1352  continue;
1353  }
1354  subs_[i].restartCount()++;
1355  if (forkInEDM_.value_) {
1356  restartForkInEDM(i);
1357  }
1358  else {
1359  pid_t rr = subs_[i].forkNew();
1360  if(rr==0)
1361  {
1362  myProcess_=&subs_[i];
1363  scalersUpdates_ = 0;
1364  int retval = pthread_mutex_destroy(&stop_lock_);
1365  if(retval != 0) perror("error");
1366  retval = pthread_mutex_init(&stop_lock_,0);
1367  if(retval != 0) perror("error");
1369  fsm_.fireEvent("Stop",this); // need to set state in fsm first to allow stopDone transition
1370  fsm_.fireEvent("StopDone",this); // need to set state in fsm first to allow stopDone transition
1371  fsm_.fireEvent("Enable",this); // need to set state in fsm first to allow stopDone transition
1372  try{
1373  xdata::Serializable *lsid = applicationInfoSpace_->find("lumiSectionIndex");
1374  if(lsid) {
1375  ((xdata::UnsignedInteger32*)(lsid))->value_--; // need to reset to value before end of ls in which process died
1376  }
1377  }
1378  catch(...){
1379  std::cout << "trouble with lsindex during restart" << std::endl;
1380  }
1381  try{
1382  xdata::Serializable *lstb = applicationInfoSpace_->find("lsToBeRecovered");
1383  if(lstb) {
1384  ((xdata::Boolean*)(lstb))->value_ = false; // do not issue eol/bol for all Ls when restarting
1385  }
1386  }
1387  catch(...){
1388  std::cout << "trouble with resetting flag for eol recovery " << std::endl;
1389  }
1390 
1393  enableMPEPSlave();
1394  return false; // exit the supervisor loop immediately in the child !!!
1395  }
1396  else
1397  {
1398  std::ostringstream ost1;
1399  ost1 << "-I- New Process " << rr << " forked for slot " << i;
1400  localLog(ost1.str());
1401  }
1402  }
1403  }
1404  if(subs_[i].countdown()>=0) subs_[i].countdown()--;
1405  }
1406  }
1407  pthread_mutex_unlock(&stop_lock_);
1408  } // finished handling replacement of dead slaves once they've been reaped
1409  }
1410  xdata::Serializable *lsid = 0;
1411  xdata::Serializable *psid = 0;
1412  xdata::Serializable *dqmp = 0;
1413  xdata::UnsignedInteger32 *dqm = 0;
1414 
1415 
1416 
1417  if(running && edm_init_done_){
1418  try{
1419  lsid = applicationInfoSpace_->find("lumiSectionIndex");
1420  psid = applicationInfoSpace_->find("prescaleSetIndex");
1421  nbProcessed = monitorInfoSpace_->find("nbProcessed");
1422  nbAccepted = monitorInfoSpace_->find("nbAccepted");
1423  dqmp = applicationInfoSpace_-> find("nbDqmUpdates");
1424  }
1426  LOG4CPLUS_INFO(getApplicationLogger(),"could not retrieve some data - " << e.what());
1427  }
1428 
1429  try{
1430  if(nbProcessed !=0 && nbAccepted !=0)
1431  {
1432  xdata::UnsignedInteger32*nbp = ((xdata::UnsignedInteger32*)nbProcessed);
1433  xdata::UnsignedInteger32*nba = ((xdata::UnsignedInteger32*)nbAccepted);
1434  xdata::UnsignedInteger32*ls = ((xdata::UnsignedInteger32*)lsid);
1435  xdata::UnsignedInteger32*ps = ((xdata::UnsignedInteger32*)psid);
1436  if(dqmp!=0)
1437  dqm = (xdata::UnsignedInteger32*)dqmp;
1438  if(dqm) dqm->value_ = 0;
1439  nbTotalDQM_ = 0;
1440  nbp->value_ = 0;
1441  nba->value_ = 0;
1442  nblive_ = 0;
1443  nbdead_ = 0;
1444  scalersUpdates_ = 0;
1445 
1446  for(unsigned int i = 0; i < subs_.size(); i++)
1447  {
1448  if(subs_[i].alive()>0)
1449  {
1450  nblive_++;
1451  try{
1452  subs_[i].post(master_message_prg_,true);
1453 
1454  unsigned long retval = subs_[i].rcvNonBlocking(master_message_prr_,true);
1455  if(retval == (unsigned long) master_message_prr_->mtype){
1456  prg* p = (struct prg*)(master_message_prr_->mtext);
1457  subs_[i].setParams(p);
1458  spMStates_[i] = p->Ms;
1459  spmStates_[i] = p->ms;
1460  cpustat_->addEntry(p->ms);
1461  if(!subs_[i].inInconsistentState() &&
1465  {
1466  std::ostringstream ost;
1467  ost << "edm::eventprocessor slot " << i << " process id "
1468  << subs_[i].pid() << " not in Running state : Mstate="
1469  << evtProcessor_.stateNameFromIndex(p->Ms) << " mstate="
1471  << " - Look into possible error messages from HLT process";
1472  LOG4CPLUS_WARN(getApplicationLogger(),ost.str());
1473  }
1474  nbp->value_ += subs_[i].params().nbp;
1475  nba->value_ += subs_[i].params().nba;
1476  if(dqm)dqm->value_ += p->dqm;
1477  nbTotalDQM_ += p->dqm;
1478  scalersUpdates_ += p->trp;
1479  if(p->ls > ls->value_) ls->value_ = p->ls;
1480  if(p->ps != ps->value_) ps->value_ = p->ps;
1481  }
1482  else{
1483  nbp->value_ += subs_[i].get_save_nbp();
1484  nba->value_ += subs_[i].get_save_nba();
1485  }
1486  }
1487  catch(evf::Exception &e){
1488  LOG4CPLUS_INFO(getApplicationLogger(),
1489  "could not send/receive msg on slot "
1490  << i << " - " << e.what());
1491  }
1492 
1493  }
1494  else
1495  {
1496  nbp->value_ += subs_[i].get_save_nbp();
1497  nba->value_ += subs_[i].get_save_nba();
1498  nbdead_++;
1499  }
1500  }
1501  if(nbp->value_>64){//have some slaves already processed more than one event ? (eventually make this == number of raw cells)
1502  for(unsigned int i = 0; i < subs_.size(); i++)
1503  {
1504  if(subs_[i].params().nbp == 0){ // a slave has processed 0 events
1505  // check that the process is not stuck
1506  if(subs_[i].alive()>0 && subs_[i].params().ms == 0) // the process is seen alive but in us=Invalid(0)
1507  {
1508  subs_[i].found_invalid();//increase the "found_invalid" counter
1509  if(subs_[i].nfound_invalid() > 60){ //wait x monitor cycles (~1 min a good time ?) before doing something about a stuck slave
1510  MsgBuf msg3(NUMERIC_MESSAGE_SIZE,MSQM_MESSAGE_TYPE_FSTOP); // send a force-stop signal
1511  subs_[i].post(msg3,false);
1512  std::ostringstream ost1;
1513  ost1 << "-W- Process in slot " << i << " Never reached the running state - forcestopping it";
1514  localLog(ost1.str());
1515  LOG4CPLUS_ERROR(getApplicationLogger(),ost1.str());
1516  XCEPT_DECLARE(evf::Exception,
1517  sentinelException, ost1.str());
1518  notifyQualified("error",sentinelException);
1519 
1520  }
1521  }
1522  }
1523  }
1524  }
1525  }
1526  }
1527  catch(std::exception &e){
1528  LOG4CPLUS_INFO(getApplicationLogger(),"std exception - " << e.what());
1529  }
1530  catch(...){
1531  LOG4CPLUS_INFO(getApplicationLogger(),"unknown exception ");
1532  }
1533  }
1534  else{
1535  for(unsigned int i = 0; i < subs_.size(); i++)
1536  {
1537  if(subs_[i].alive()==-1000)
1538  {
1540  spmStates_[i] = 0;
1541  }
1542  }
1543  }
1544  try{
1545  monitorInfoSpace_->lock();
1546  monitorInfoSpace_->fireItemGroupChanged(names_,0);
1547  monitorInfoSpace_->unlock();
1548  }
1549  catch(xdata::exception::Exception &e)
1550  {
1551  LOG4CPLUS_ERROR(log_, "Exception from fireItemGroupChanged: " << e.what());
1552  // localLog(e.what());
1553  }
1554  ::sleep(superSleepSec_.value_);
1555  return true;
1556 }
dbl * delta
Definition: mlp_gen.cc:36
unsigned int ps
Definition: queue_defs.h:60
xdata::Vector< xdata::Integer > spMStates_
int i
Definition: DBlmapReader.cc:9
xdata::InfoSpace * monitorInfoSpace_
xdata::UnsignedInteger32 slaveRestartDelaySecs_
bool restartForkInEDM(unsigned int slotId)
xdata::UnsignedInteger32 forkInEDM_
#define MSQM_MESSAGE_TYPE_FSTOP
Definition: queue_defs.h:17
unsigned int ls
Definition: queue_defs.h:58
xdata::Boolean autoRestartSlaves_
std::string const & moduleNameFromIndex(unsigned int i) const
Definition: FWEPWrapper.h:122
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:7
unsigned int crashesThisRun_
void sleep(Duration_t)
Definition: Utils.h:163
#define MSQM_MESSAGE_TYPE_RLR
Definition: queue_defs.h:25
std::vector< SubProcess > subs_
xdata::Serializable * nbProcessed
void adjustLsIndexForRestart()
Definition: FWEPWrapper.h:112
evf::StateMachine fsm_
unsigned int Ms
Definition: queue_defs.h:63
void resetPackedTriggerReport()
Definition: FWEPWrapper.h:111
std::list< std::string > names_
void addEntry(int sta)
Definition: CPUStat.h:17
#define NUMERIC_MESSAGE_SIZE
Definition: queue_defs.h:38
bool stopping(toolbox::task::WorkLoop *wl)
void fireEvent(const std::string &evtType, void *originator)
xdata::Serializable * nbAccepted
void disableRcmsStateNotification()
Definition: StateMachine.h:64
void localLog(std::string)
int notstarted_state_code() const
Definition: FWEPWrapper.h:132
xdata::InfoSpace * applicationInfoSpace_
xdata::UnsignedInteger32 nbSubProcesses_
xdata::String * stateName()
Definition: StateMachine.h:69
unsigned int scalersUpdates_
xdata::UnsignedInteger32 superSleepSec_
std::string const & stateNameFromIndex(unsigned int i) const
Definition: FWEPWrapper.h:127
pthread_mutex_t pickup_lock_
unsigned int trp
Definition: queue_defs.h:66
xdata::Vector< xdata::Integer > spmStates_
tuple cout
Definition: gather_cfg.py:121
pthread_mutex_t stop_lock_
unsigned int dqm
Definition: queue_defs.h:65
unsigned int ms
Definition: queue_defs.h:64
void FUEventProcessor::updater ( xgi::Input in,
xgi::Output out 
)

Definition at line 2591 of file FUEventProcessor.cc.

References evf::lsTriplet::acc, evf::utils::cDiv(), configString_, cpustat_, evtProcessor_, fsm_, evf::CPUStat::getChart(), evf::Vulture::hasStarted(), i, iDieUrl_, evf::FWEPWrapper::lastLumi(), logRing_, logRingIndex_, logRingSize_, logWrap_, evf::lsTriplet::ls, evf::utils::mDiv(), myProcess_, nbAccepted, nbProcessed, nbSubProcesses_, nbSubProcessesReporting_, evf::lsTriplet::proc, runNumber_, squidPresent_, evf::StateMachine::stateName(), supervising_, updaterStatic_, evf::utils::uptime(), vp_, vulture_, evf::FWEPWrapper::wlMonitoring(), wlScalersActive_, and wlSummarizeActive_.

Referenced by FUEventProcessor().

2592 {
2593  using namespace utils;
2594 
2595  *out << updaterStatic_;
2596  mDiv(out,"loads");
2597  uptime(out);
2598  cDiv(out);
2599  mDiv(out,"st",fsm_.stateName()->toString());
2600  mDiv(out,"ru",runNumber_.toString());
2601  mDiv(out,"nsl",nbSubProcesses_.value_);
2602  mDiv(out,"nsr",nbSubProcessesReporting_.value_);
2603  mDiv(out,"cl");
2604  *out << getApplicationDescriptor()->getClassName()
2605  << (nbSubProcesses_.value_ > 0 ? "MP " : " ");
2606  cDiv(out);
2607  mDiv(out,"in",getApplicationDescriptor()->getInstance());
2608  if(fsm_.stateName()->toString() != "Halted" && fsm_.stateName()->toString() != "halting"){
2609  mDiv(out,"hlt");
2610  *out << "<a href=\"" << configString_.toString() << "\">HLT Config</a>";
2611  cDiv(out);
2612  *out << std::endl;
2613  }
2614  else
2615  mDiv(out,"hlt","Not yet...");
2616 
2617  mDiv(out,"sq",squidPresent_.toString());
2618  mDiv(out,"vwl",(supervising_ ? "Active" : "Not Initialized"));
2620  if(nbProcessed != 0 && nbAccepted != 0)
2621  {
2622  mDiv(out,"tt",((xdata::UnsignedInteger32*)nbProcessed)->value_);
2623  mDiv(out,"ac",((xdata::UnsignedInteger32*)nbAccepted)->value_);
2624  }
2625  else
2626  {
2627  mDiv(out,"tt",0);
2628  mDiv(out,"ac",0);
2629  }
2630  if(!myProcess_)
2631  mDiv(out,"swl",(wlSummarizeActive_ ? "Active" : "Inactive"));
2632  else
2633  mDiv(out,"swl",(wlScalersActive_ ? "Active" : "Inactive"));
2634 
2635  mDiv(out,"idi",iDieUrl_.value_);
2636  if(vp_!=0){
2637  mDiv(out,"vpi",(unsigned int) vp_);
2638  if(vulture_->hasStarted()>=0)
2639  mDiv(out,"vul","Prowling");
2640  else
2641  mDiv(out,"vul","Dead");
2642  }
2643  else{
2644  mDiv(out,"vul",(vulture_==0 ? "Nope" : "Hatching"));
2645  }
2646  if(evtProcessor_){
2647  mDiv(out,"ll");
2649  << "," << evtProcessor_.lastLumi().proc << "," << evtProcessor_.lastLumi().acc;
2650  cDiv(out);
2651  }
2652  mDiv(out,"lg");
2653  for(unsigned int i = logRingIndex_; i<logRingSize_; i++)
2654  *out << logRing_[i] << std::endl;
2655  if(logWrap_)
2656  for(unsigned int i = 0; i<logRingIndex_; i++)
2657  *out << logRing_[i] << std::endl;
2658  cDiv(out);
2659  mDiv(out,"cha");
2660  if(cpustat_) *out << cpustat_->getChart();
2661  cDiv(out);
2662 }
int i
Definition: DBlmapReader.cc:9
std::vector< std::string > logRing_
xdata::UnsignedInteger32 runNumber_
unsigned int acc
Definition: FWEPWrapper.h:45
xdata::UnsignedInteger32 nbSubProcessesReporting_
void cDiv(std::ostringstream *out)
Definition: procUtils.cc:357
lsTriplet & lastLumi()
Definition: FWEPWrapper.h:133
xdata::Serializable * nbProcessed
std::string wlMonitoring()
Definition: FWEPWrapper.h:98
evf::StateMachine fsm_
unsigned int proc
Definition: FWEPWrapper.h:44
xdata::Boolean squidPresent_
xdata::Serializable * nbAccepted
tuple out
Definition: dbtoconf.py:99
xdata::UnsignedInteger32 nbSubProcesses_
xdata::String * stateName()
Definition: StateMachine.h:69
std::string & getChart()
Definition: CPUStat.h:43
xdata::String configString_
unsigned int ls
Definition: FWEPWrapper.h:43
int hasStarted()
Definition: Vulture.cc:215
void mDiv(std::ostringstream *out, std::string name)
Definition: procUtils.cc:354
void uptime(std::ostringstream *out)
Definition: procUtils.cc:309
static const unsigned int logRingSize_
evf::FUEventProcessor::XDAQ_INSTANTIATOR ( )

Member Data Documentation

unsigned long long evf::FUEventProcessor::allProcStats_
private

Definition at line 302 of file FUEventProcessor.h.

Referenced by enabling(), and summarize().

int evf::FUEventProcessor::anonymousPipe_[2]
private

Definition at line 270 of file FUEventProcessor.h.

Referenced by FUEventProcessor(), receivingAndMonitor(), and subWeb().

xdata::InfoSpace* evf::FUEventProcessor::applicationInfoSpace_
private
toolbox::task::ActionSignature* evf::FUEventProcessor::asReceiveMsgAndExecute_
private

Definition at line 232 of file FUEventProcessor.h.

Referenced by startReceivingLoop().

toolbox::task::ActionSignature* evf::FUEventProcessor::asReceiveMsgAndRead_
private

Definition at line 235 of file FUEventProcessor.h.

Referenced by startReceivingMonitorLoop().

toolbox::task::ActionSignature* evf::FUEventProcessor::asScalers_
private

Definition at line 262 of file FUEventProcessor.h.

Referenced by startScalersWorkLoop().

toolbox::task::ActionSignature* evf::FUEventProcessor::asSignalMonitor_
private

Definition at line 242 of file FUEventProcessor.h.

Referenced by startSignalMonitorWorkLoop().

toolbox::task::ActionSignature* evf::FUEventProcessor::asSummarize_
private

Definition at line 268 of file FUEventProcessor.h.

Referenced by startSummarizeWorkLoop().

toolbox::task::ActionSignature* evf::FUEventProcessor::asSupervisor_
private

Definition at line 239 of file FUEventProcessor.h.

Referenced by startSupervisorLoop().

xdata::Boolean evf::FUEventProcessor::autoRestartSlaves_
private

Definition at line 188 of file FUEventProcessor.h.

Referenced by FUEventProcessor(), microState(), and supervisor().

xdata::String evf::FUEventProcessor::class_
private

Definition at line 179 of file FUEventProcessor.h.

Referenced by FUEventProcessor().

xdata::String evf::FUEventProcessor::configString_
private

Definition at line 183 of file FUEventProcessor.h.

Referenced by configuring(), enabling(), FUEventProcessor(), and updater().

std::string evf::FUEventProcessor::configuration_
private

Definition at line 184 of file FUEventProcessor.h.

Referenced by configuring(), enabling(), and spotlightWebPage().

CPUStat* evf::FUEventProcessor::cpustat_
private

Definition at line 278 of file FUEventProcessor.h.

Referenced by configuring(), enabling(), summarize(), supervisor(), and updater().

unsigned int evf::FUEventProcessor::crashesThisRun_
private

Definition at line 294 of file FUEventProcessor.h.

Referenced by enabling(), sigmon(), and supervisor().

xdata::UnsignedInteger32 evf::FUEventProcessor::crashesToDump_
private

Definition at line 297 of file FUEventProcessor.h.

Referenced by FUEventProcessor(), and sigmon().

Css evf::FUEventProcessor::css_
private

Definition at line 210 of file FUEventProcessor.h.

Referenced by css().

xdata::Boolean evf::FUEventProcessor::datasetCounting_
private

Definition at line 306 of file FUEventProcessor.h.

Referenced by configuring(), enabling(), and FUEventProcessor().

bool evf::FUEventProcessor::edm_init_done_
private

Definition at line 292 of file FUEventProcessor.h.

Referenced by doEndRunInEDM(), enabling(), forkProcessesFromEDM(), and supervisor().

xdata::Boolean evf::FUEventProcessor::epInitialized_
private

Definition at line 182 of file FUEventProcessor.h.

Referenced by actionPerformed(), configuring(), enabling(), and FUEventProcessor().

FWEPWrapper evf::FUEventProcessor::evtProcessor_
private
xdata::Boolean evf::FUEventProcessor::exitOnError_
private

Definition at line 207 of file FUEventProcessor.h.

Referenced by receivingAndMonitor().

xdata::UnsignedInteger32 evf::FUEventProcessor::forkInEDM_
private

Definition at line 223 of file FUEventProcessor.h.

Referenced by enabling(), FUEventProcessor(), halting(), stopping(), and supervisor().

moduleweb::ForkInfoObj* evf::FUEventProcessor::forkInfoObj_
private
pthread_mutex_t evf::FUEventProcessor::forkObjLock_
private

Definition at line 290 of file FUEventProcessor.h.

Referenced by enableForkInEDM(), and FUEventProcessor().

evf::StateMachine evf::FUEventProcessor::fsm_
private
xdata::Boolean evf::FUEventProcessor::hasModuleWebRegistry_
private

Definition at line 193 of file FUEventProcessor.h.

Referenced by configuring(), enabling(), FUEventProcessor(), and makeStaticInfo().

xdata::Boolean evf::FUEventProcessor::hasPrescaleService_
private

Definition at line 192 of file FUEventProcessor.h.

Referenced by configuring(), enabling(), and FUEventProcessor().

xdata::Boolean evf::FUEventProcessor::hasServiceWebRegistry_
private

Definition at line 194 of file FUEventProcessor.h.

Referenced by configuring(), enabling(), FUEventProcessor(), and makeStaticInfo().

xdata::Boolean evf::FUEventProcessor::hasShMem_
private
xdata::Boolean evf::FUEventProcessor::iDieStatisticsGathering_
private

Definition at line 196 of file FUEventProcessor.h.

Referenced by configuring(), enabling(), FUEventProcessor(), and summarize().

xdata::String evf::FUEventProcessor::iDieUrl_
private

Definition at line 275 of file FUEventProcessor.h.

Referenced by configuring(), enabling(), FUEventProcessor(), and updater().

unsigned long long evf::FUEventProcessor::idleProcStats_
private

Definition at line 301 of file FUEventProcessor.h.

Referenced by enabling(), and summarize().

xdata::UnsignedInteger32 evf::FUEventProcessor::instance_
private

Definition at line 180 of file FUEventProcessor.h.

Referenced by configuring(), enabling(), and FUEventProcessor().

xdata::Boolean evf::FUEventProcessor::isRunNumberSetter_
private

Definition at line 195 of file FUEventProcessor.h.

Referenced by enableCommon(), enableForkInEDM(), and FUEventProcessor().

timeval evf::FUEventProcessor::lastCrashTime_
private

Definition at line 299 of file FUEventProcessor.h.

Referenced by sigmon(), and supervisor().

timeval evf::FUEventProcessor::lastProcReport_
private

Definition at line 303 of file FUEventProcessor.h.

Referenced by summarize().

Logger evf::FUEventProcessor::log_
private

Definition at line 172 of file FUEventProcessor.h.

Referenced by doEndRunInEDM(), enableForkInEDM(), restartForkInEDM(), and supervisor().

std::vector<std::string> evf::FUEventProcessor::logRing_
private

Definition at line 216 of file FUEventProcessor.h.

Referenced by localLog(), logsAsString(), and updater().

unsigned int evf::FUEventProcessor::logRingIndex_
private

Definition at line 217 of file FUEventProcessor.h.

Referenced by localLog(), logsAsString(), and updater().

const unsigned int evf::FUEventProcessor::logRingSize_ = 50
staticprivate

Definition at line 218 of file FUEventProcessor.h.

Referenced by localLog(), and updater().

bool evf::FUEventProcessor::logWrap_
private

Definition at line 219 of file FUEventProcessor.h.

Referenced by localLog(), logsAsString(), and updater().

MsgBuf evf::FUEventProcessor::master_message_prg_
private

Definition at line 283 of file FUEventProcessor.h.

Referenced by supervisor().

MsgBuf evf::FUEventProcessor::master_message_prr_
private

Definition at line 284 of file FUEventProcessor.h.

Referenced by supervisor().

MsgBuf evf::FUEventProcessor::master_message_trr_
private

Definition at line 287 of file FUEventProcessor.h.

Referenced by summarize().

std::auto_ptr<edm::Presence> evf::FUEventProcessor::messageServicePresence_
private

Definition at line 305 of file FUEventProcessor.h.

Referenced by forkProcessesFromEDM(), FUEventProcessor(), and receiving().

xdata::InfoSpace* evf::FUEventProcessor::monitorInfoSpace_
private

Definition at line 246 of file FUEventProcessor.h.

Referenced by FUEventProcessor(), and supervisor().

xdata::InfoSpace* evf::FUEventProcessor::monitorLegendaInfoSpace_
private

Definition at line 247 of file FUEventProcessor.h.

Referenced by FUEventProcessor().

ModuleWebRegistry* evf::FUEventProcessor::mwrRef_
private

Definition at line 281 of file FUEventProcessor.h.

Referenced by configuring(), enableForkInEDM(), and enabling().

SubProcess* evf::FUEventProcessor::myProcess_
private
std::list<std::string> evf::FUEventProcessor::names_
private

Definition at line 274 of file FUEventProcessor.h.

Referenced by FUEventProcessor(), and supervisor().

xdata::Serializable* evf::FUEventProcessor::nbAccepted
private

Definition at line 254 of file FUEventProcessor.h.

Referenced by supervisor(), and updater().

unsigned int evf::FUEventProcessor::nbdead_
private

Definition at line 226 of file FUEventProcessor.h.

Referenced by microState(), and supervisor().

unsigned int evf::FUEventProcessor::nblive_
private

Definition at line 225 of file FUEventProcessor.h.

Referenced by microState(), and supervisor().

xdata::Serializable* evf::FUEventProcessor::nbProcessed
private

Definition at line 253 of file FUEventProcessor.h.

Referenced by supervisor(), and updater().

xdata::UnsignedInteger32 evf::FUEventProcessor::nbSubProcesses_
private
xdata::UnsignedInteger32 evf::FUEventProcessor::nbSubProcessesReporting_
private

Definition at line 222 of file FUEventProcessor.h.

Referenced by FUEventProcessor(), summarize(), and updater().

unsigned int evf::FUEventProcessor::nbTotalDQM_
private

Definition at line 228 of file FUEventProcessor.h.

Referenced by enabling(), microState(), and supervisor().

bool evf::FUEventProcessor::outprev_
private

Definition at line 197 of file FUEventProcessor.h.

Referenced by actionPerformed().

xdata::Boolean evf::FUEventProcessor::outPut_
private

Definition at line 186 of file FUEventProcessor.h.

Referenced by actionPerformed(), FUEventProcessor(), and makeStaticInfo().

pthread_mutex_t evf::FUEventProcessor::pickup_lock_
private

Definition at line 251 of file FUEventProcessor.h.

Referenced by enabling(), FUEventProcessor(), microState(), and supervisor().

RateStat* evf::FUEventProcessor::ratestat_
private

Definition at line 279 of file FUEventProcessor.h.

Referenced by configuring(), enabling(), and summarize().

std::string evf::FUEventProcessor::reasonForFailedState_
private
bool evf::FUEventProcessor::receiving_
private

Definition at line 233 of file FUEventProcessor.h.

Referenced by startReceivingLoop().

bool evf::FUEventProcessor::receivingM_
private

Definition at line 236 of file FUEventProcessor.h.

Referenced by startReceivingMonitorLoop().

bool evf::FUEventProcessor::restart_in_progress_
private

Definition at line 291 of file FUEventProcessor.h.

Referenced by forkProcessesFromEDM(), and restartForkInEDM().

bool evf::FUEventProcessor::rlimit_coresize_changed_
private

Definition at line 295 of file FUEventProcessor.h.

Referenced by enabling(), handleSignalSlave(), receiving(), sigmon(), and supervisor().

rlimit evf::FUEventProcessor::rlimit_coresize_default_
private

Definition at line 296 of file FUEventProcessor.h.

Referenced by enabling(), FUEventProcessor(), halting(), receiving(), stopping(), and supervisor().

xdata::UnsignedInteger32 evf::FUEventProcessor::runNumber_
private
xdata::InfoSpace* evf::FUEventProcessor::scalersInfoSpace_
private

Definition at line 257 of file FUEventProcessor.h.

Referenced by FUEventProcessor().

xdata::InfoSpace* evf::FUEventProcessor::scalersLegendaInfoSpace_
private

Definition at line 258 of file FUEventProcessor.h.

Referenced by configuring(), enabling(), FUEventProcessor(), and pathNames().

unsigned int evf::FUEventProcessor::scalersUpdates_
private
sem_t* evf::FUEventProcessor::sigmon_sem_
private
bool evf::FUEventProcessor::signalMonitorActive_
private

Definition at line 243 of file FUEventProcessor.h.

Referenced by sigmon(), and startSignalMonitorWorkLoop().

MsgBuf evf::FUEventProcessor::slave_message_monitoring_
private

Definition at line 286 of file FUEventProcessor.h.

Referenced by receivingAndMonitor().

MsgBuf evf::FUEventProcessor::slave_message_prr_
private

Definition at line 285 of file FUEventProcessor.h.

Referenced by receivingAndMonitor().

xdata::UnsignedInteger32 evf::FUEventProcessor::slaveRestartDelaySecs_
private

Definition at line 189 of file FUEventProcessor.h.

Referenced by FUEventProcessor(), and supervisor().

ShmOutputModuleRegistry* evf::FUEventProcessor::sorRef_
private

Definition at line 282 of file FUEventProcessor.h.

Referenced by configuring(), enabling(), and forkProcessesFromEDM().

std::string evf::FUEventProcessor::sourceId_
private

Definition at line 200 of file FUEventProcessor.h.

Referenced by FUEventProcessor().

xdata::Vector<xdata::Integer> evf::FUEventProcessor::spMStates_
private

Definition at line 271 of file FUEventProcessor.h.

Referenced by configuring(), FUEventProcessor(), and supervisor().

xdata::Vector<xdata::Integer> evf::FUEventProcessor::spmStates_
private

Definition at line 272 of file FUEventProcessor.h.

Referenced by configuring(), FUEventProcessor(), and supervisor().

SquidNet evf::FUEventProcessor::squidnet_
private

Definition at line 215 of file FUEventProcessor.h.

Referenced by FUEventProcessor().

xdata::Boolean evf::FUEventProcessor::squidPresent_
private

Definition at line 203 of file FUEventProcessor.h.

Referenced by FUEventProcessor(), and updater().

pthread_mutex_t evf::FUEventProcessor::start_lock_
private

Definition at line 250 of file FUEventProcessor.h.

Referenced by enabling(), FUEventProcessor(), and microState().

pthread_mutex_t evf::FUEventProcessor::stop_lock_
private
std::vector<SubProcess> evf::FUEventProcessor::subs_
private
xdata::UnsignedInteger32 evf::FUEventProcessor::superSleepSec_
private

Definition at line 273 of file FUEventProcessor.h.

Referenced by FUEventProcessor(), subWeb(), and supervisor().

bool evf::FUEventProcessor::supervising_
private

Definition at line 240 of file FUEventProcessor.h.

Referenced by startSupervisorLoop(), and updater().

std::string evf::FUEventProcessor::updaterStatic_
private

Definition at line 252 of file FUEventProcessor.h.

Referenced by makeStaticInfo(), and updater().

xdata::String evf::FUEventProcessor::url_
private

Definition at line 178 of file FUEventProcessor.h.

Referenced by FUEventProcessor().

pid_t evf::FUEventProcessor::vp_
private

Definition at line 277 of file FUEventProcessor.h.

Referenced by configuring(), enabling(), and updater().

Vulture* evf::FUEventProcessor::vulture_
private
toolbox::task::WorkLoop* evf::FUEventProcessor::wlReceiving_
private

Definition at line 231 of file FUEventProcessor.h.

Referenced by forkProcessesFromEDM(), and startReceivingLoop().

toolbox::task::WorkLoop* evf::FUEventProcessor::wlReceivingMonitor_
private

Definition at line 234 of file FUEventProcessor.h.

Referenced by forkProcessesFromEDM(), and startReceivingMonitorLoop().

toolbox::task::WorkLoop* evf::FUEventProcessor::wlScalers_
private

Definition at line 261 of file FUEventProcessor.h.

Referenced by startScalersWorkLoop().

bool evf::FUEventProcessor::wlScalersActive_
private

Definition at line 263 of file FUEventProcessor.h.

Referenced by scalers(), startScalersWorkLoop(), summarize(), and updater().

toolbox::task::WorkLoop* evf::FUEventProcessor::wlSignalMonitor_
private

Definition at line 241 of file FUEventProcessor.h.

Referenced by startSignalMonitorWorkLoop().

toolbox::task::WorkLoop* evf::FUEventProcessor::wlSummarize_
private

Definition at line 267 of file FUEventProcessor.h.

Referenced by startSummarizeWorkLoop().

bool evf::FUEventProcessor::wlSummarizeActive_
private

Definition at line 269 of file FUEventProcessor.h.

Referenced by startSummarizeWorkLoop(), and updater().

toolbox::task::WorkLoop* evf::FUEventProcessor::wlSupervising_
private

Definition at line 238 of file FUEventProcessor.h.

Referenced by startSupervisorLoop().