CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
FUEventProcessor.cc
Go to the documentation of this file.
1 //
3 // FUEventProcessor
4 // ----------------
5 //
7 
8 #include "FUEventProcessor.h"
9 #include "procUtils.h"
12 
14 
22 
29 
30 #include "toolbox/BSem.h"
31 #include "toolbox/Runtime.h"
32 #include "toolbox/stacktrace.h"
33 #include "toolbox/net/Utils.h"
34 
35 #include <boost/tokenizer.hpp>
36 
37 #include "xcept/tools.h"
38 #include "xgi/Method.h"
39 
40 #include "cgicc/CgiDefs.h"
41 #include "cgicc/Cgicc.h"
42 #include "cgicc/FormEntry.h"
43 
44 
45 #include <sys/wait.h>
46 #include <sys/utsname.h>
47 #include <sys/mman.h>
48 #include <signal.h>
49 
50 #include <typeinfo>
51 #include <stdlib.h>
52 #include <stdio.h>
53 #include <errno.h>
54 
55 
56 using namespace evf;
57 using namespace cgicc;
58 namespace toolbox {
59  namespace mem {
60  extern toolbox::BSem * _s_mutex_ptr_;
61  }
62 }
63 
64 //signal handler (global)
65 namespace evf {
68  void evfep_sighandler(int sig, siginfo_t* info, void* c)
69  {
71  FUInstancePtr_->handleSignalSlave(sig, info, c);
72  }
73  void evfep_alarmhandler(int sig, siginfo_t* info, void* c)
74  {
75  if (evfep_raised_signal) {
76  signal(evfep_raised_signal,SIG_DFL);
77  raise(evfep_raised_signal);
78  }
79  }
80 }
81 
83 // construction/destruction
85 
86 //______________________________________________________________________________
87 FUEventProcessor::FUEventProcessor(xdaq::ApplicationStub *s)
88  : xdaq::Application(s)
89  , fsm_(this)
90  , log_(getApplicationLogger())
91  , evtProcessor_(log_, getApplicationDescriptor()->getInstance())
92  , runNumber_(0)
93  , epInitialized_(false)
94  , outPut_(true)
95  , autoRestartSlaves_(false)
96  , slaveRestartDelaySecs_(10)
97  , hasShMem_(true)
98  , hasPrescaleService_(true)
99  , hasModuleWebRegistry_(true)
100  , hasServiceWebRegistry_(true)
101  , isRunNumberSetter_(true)
102  , iDieStatisticsGathering_(false)
103  , outprev_(true)
104  , exitOnError_(true)
105  , reasonForFailedState_()
106  , squidnet_(3128,"http://localhost:8000/RELEASE-NOTES.txt")
107  , logRing_(logRingSize_)
108  , logRingIndex_(logRingSize_)
109  , logWrap_(false)
110  , nbSubProcesses_(0)
111  , nbSubProcessesReporting_(0)
112  , forkInEDM_(true)
113  , nblive_(0)
114  , nbdead_(0)
115  , nbTotalDQM_(0)
116  , wlReceiving_(0)
117  , asReceiveMsgAndExecute_(0)
118  , receiving_(false)
119  , wlReceivingMonitor_(0)
120  , asReceiveMsgAndRead_(0)
121  , receivingM_(false)
122  , myProcess_(0)
123  , wlSupervising_(0)
124  , asSupervisor_(0)
125  , supervising_(false)
126  , wlSignalMonitor_(0)
127  , asSignalMonitor_(0)
128  , signalMonitorActive_(false)
129  , monitorInfoSpace_(0)
130  , monitorLegendaInfoSpace_(0)
131  , applicationInfoSpace_(0)
132  , nbProcessed(0)
133  , nbAccepted(0)
134  , scalersInfoSpace_(0)
135  , scalersLegendaInfoSpace_(0)
136  , wlScalers_(0)
137  , asScalers_(0)
138  , wlScalersActive_(false)
139  , scalersUpdates_(0)
140  , wlSummarize_(0)
141  , asSummarize_(0)
142  , wlSummarizeActive_(false)
143  , superSleepSec_(1)
144  , iDieUrl_("none")
145  , vulture_(0)
146  , vp_(0)
147  , cpustat_(0)
148  , ratestat_(0)
149  , mwrRef_(nullptr)
150  , sorRef_(nullptr)
151  , master_message_prg_(0,MSQM_MESSAGE_TYPE_PRG)
152  , master_message_prr_(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_PRR)
153  , slave_message_prr_(sizeof(prg),MSQS_MESSAGE_TYPE_PRR)
154  , master_message_trr_(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_TRR)
155  , edm_init_done_(true)
156  , crashesThisRun_(false)
157  , rlimit_coresize_changed_(false)
158  , crashesToDump_(2)
159  , sigmon_sem_(0)
160  , datasetCounting_(true)
161 {
162  using namespace utils;
163 
164  FUInstancePtr_=this;
165 
166  names_.push_back("nbProcessed" );
167  names_.push_back("nbAccepted" );
168  names_.push_back("epMacroStateInt");
169  names_.push_back("epMicroStateInt");
170  // create pipe for web communication
171  int retpipe = pipe(anonymousPipe_);
172  if(retpipe != 0)
173  LOG4CPLUS_ERROR(getApplicationLogger(),"Failed to create pipe");
174  // check squid presence
176  //pass application parameters to FWEPWrapper
177  evtProcessor_.setAppDesc(getApplicationDescriptor());
178  evtProcessor_.setAppCtxt(getApplicationContext());
179  // bind relevant callbacks to finite state machine
181 
182  //set sourceId_
183  url_ =
184  getApplicationDescriptor()->getContextDescriptor()->getURL()+"/"+
185  getApplicationDescriptor()->getURN();
186  class_ =getApplicationDescriptor()->getClassName();
187  instance_=getApplicationDescriptor()->getInstance();
188  sourceId_=class_.toString()+"_"+instance_.toString();
189  LOG4CPLUS_INFO(getApplicationLogger(),sourceId_ <<" constructor" );
190  LOG4CPLUS_INFO(getApplicationLogger(),"CMSSW_BASE:"<<getenv("CMSSW_BASE"));
191 
192  getApplicationDescriptor()->setAttribute("icon", "/evf/images/epicon.jpg");
193 
194  xdata::InfoSpace *ispace = getApplicationInfoSpace();
195  applicationInfoSpace_ = ispace;
196 
197  // default configuration
198  ispace->fireItemAvailable("parameterSet", &configString_ );
199  ispace->fireItemAvailable("epInitialized", &epInitialized_ );
200  ispace->fireItemAvailable("stateName", fsm_.stateName() );
201  ispace->fireItemAvailable("runNumber", &runNumber_ );
202  ispace->fireItemAvailable("outputEnabled", &outPut_ );
203 
204  ispace->fireItemAvailable("hasSharedMemory", &hasShMem_);
205  ispace->fireItemAvailable("hasPrescaleService", &hasPrescaleService_ );
206  ispace->fireItemAvailable("hasModuleWebRegistry", &hasModuleWebRegistry_ );
207  ispace->fireItemAvailable("hasServiceWebRegistry", &hasServiceWebRegistry_ );
208  ispace->fireItemAvailable("isRunNumberSetter", &isRunNumberSetter_ );
209  ispace->fireItemAvailable("iDieStatisticsGathering", &iDieStatisticsGathering_);
210  ispace->fireItemAvailable("rcmsStateListener", fsm_.rcmsStateListener() );
211  ispace->fireItemAvailable("foundRcmsStateListener",fsm_.foundRcmsStateListener());
212  ispace->fireItemAvailable("nbSubProcesses", &nbSubProcesses_ );
213  ispace->fireItemAvailable("nbSubProcessesReporting",&nbSubProcessesReporting_ );
214  ispace->fireItemAvailable("forkInEDM" ,&forkInEDM_ );
215  ispace->fireItemAvailable("superSleepSec", &superSleepSec_ );
216  ispace->fireItemAvailable("autoRestartSlaves", &autoRestartSlaves_ );
217  ispace->fireItemAvailable("slaveRestartDelaySecs",&slaveRestartDelaySecs_ );
218  ispace->fireItemAvailable("iDieUrl", &iDieUrl_ );
219  ispace->fireItemAvailable("crashesToDump" ,&crashesToDump_ );
220  ispace->fireItemAvailable("datasetCounting" ,&datasetCounting_ );
221 
222  // Add infospace listeners for exporting data values
223  getApplicationInfoSpace()->addItemChangedListener("parameterSet", this);
224  getApplicationInfoSpace()->addItemChangedListener("outputEnabled", this);
225 
226  // findRcmsStateListener
228 
229  // initialize monitoring infospace
230 
231  std::string monInfoSpaceName="evf-eventprocessor-status-monitor";
232  toolbox::net::URN urn = this->createQualifiedInfoSpace(monInfoSpaceName);
233  monitorInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
234 
235  std::string monLegendaInfoSpaceName="evf-eventprocessor-status-legenda";
236  urn = this->createQualifiedInfoSpace(monLegendaInfoSpaceName);
237  monitorLegendaInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
238 
239 
240  monitorInfoSpace_->fireItemAvailable("url", &url_ );
241  monitorInfoSpace_->fireItemAvailable("class", &class_ );
242  monitorInfoSpace_->fireItemAvailable("instance", &instance_ );
243  monitorInfoSpace_->fireItemAvailable("runNumber", &runNumber_ );
244  monitorInfoSpace_->fireItemAvailable("stateName", fsm_.stateName());
245 
246  monitorInfoSpace_->fireItemAvailable("squidPresent", &squidPresent_ );
247 
248  std::string scalersInfoSpaceName="evf-eventprocessor-scalers-monitor";
249  urn = this->createQualifiedInfoSpace(scalersInfoSpaceName);
250  scalersInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
251 
252  std::string scalersLegendaInfoSpaceName="evf-eventprocessor-scalers-legenda";
253  urn = this->createQualifiedInfoSpace(scalersLegendaInfoSpaceName);
254  scalersLegendaInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
255 
256 
257 
259  scalersInfoSpace_->fireItemAvailable("instance", &instance_);
260 
264 
265  //subprocess state vectors for MP
266  monitorInfoSpace_->fireItemAvailable("epMacroStateInt", &spMStates_);
267  monitorInfoSpace_->fireItemAvailable("epMicroStateInt", &spmStates_);
268 
269  // Bind web interface
270  xgi::bind(this, &FUEventProcessor::css, "styles.css");
271  xgi::bind(this, &FUEventProcessor::defaultWebPage, "Default" );
272  xgi::bind(this, &FUEventProcessor::spotlightWebPage, "Spotlight" );
273  xgi::bind(this, &FUEventProcessor::scalersWeb, "scalersWeb");
274  xgi::bind(this, &FUEventProcessor::pathNames, "pathNames" );
275  xgi::bind(this, &FUEventProcessor::subWeb, "SubWeb" );
276  xgi::bind(this, &FUEventProcessor::getSlavePids, "getSlavePids");
277  xgi::bind(this, &FUEventProcessor::moduleWeb, "moduleWeb" );
278  xgi::bind(this, &FUEventProcessor::serviceWeb, "serviceWeb");
279  xgi::bind(this, &FUEventProcessor::microState, "microState");
280  xgi::bind(this, &FUEventProcessor::updater, "updater" );
281  xgi::bind(this, &FUEventProcessor::procStat, "procStat" );
282 
283  // instantiate the plugin manager, not referenced here after!
284 
286 
287  //create Message Service thread and pass ownership to auto_ptr that we destroy before fork
288  try{
289  LOG4CPLUS_DEBUG(getApplicationLogger(),
290  "Trying to create message service presence ");
292  if(pf != 0) {
293  messageServicePresence_= pf->makePresence("MessageServicePresence");
294  }
295  else {
296  LOG4CPLUS_ERROR(getApplicationLogger(),
297  "Unable to create message service presence ");
298  }
299  }
300  catch(...) {
301  LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception");
302  }
304 
305  typedef std::set<xdaq::ApplicationDescriptor*> AppDescSet_t;
306  typedef AppDescSet_t::iterator AppDescIter_t;
307 
308  AppDescSet_t rcms=
309  getApplicationContext()->getDefaultZone()->
310  getApplicationDescriptors("RCMSStateListener");
311  if(rcms.size()==0)
312  {
313  LOG4CPLUS_WARN(getApplicationLogger(),
314  "MonitorReceiver not found, perhaphs it has not been defined ? Scalers updater wl will bail out!");
315  // localLog("-W- MonitorReceiver not found, perhaphs it has not been defined ? Scalers updater wl will bail out!");
316  }
317  else
318  {
319  AppDescIter_t it = rcms.begin();
320  evtProcessor_.setRcms(*it);
321  }
322  pthread_mutex_init(&start_lock_,0);
323  pthread_mutex_init(&stop_lock_,0);
324  pthread_mutex_init(&pickup_lock_,0);
325 
326  forkInfoObj_=nullptr;
327  pthread_mutex_init(&forkObjLock_,0);
328  makeStaticInfo();
330 
331  if(vulture_==0) vulture_ = new Vulture(true);
332 
334 
335  AppDescSet_t setOfiDie=
336  getApplicationContext()->getDefaultZone()->
337  getApplicationDescriptors("evf::iDie");
338 
339  for (AppDescIter_t it=setOfiDie.begin();it!=setOfiDie.end();++it)
340  if ((*it)->getInstance()==0) // there has to be only one instance of iDie
341  iDieUrl_ = (*it)->getContextDescriptor()->getURL() + "/" + (*it)->getURN();
342 
343  //save default core file size
344  getrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
345 
346  //prepare IPC semaphore for getting the workloop waked up on signal caught in slaves
347  #ifdef linux
348  if (sigmon_sem_==0) {
349  sigmon_sem_ = (sem_t*)mmap(NULL, sizeof(sem_t),
350  PROT_READ | PROT_WRITE,
351  MAP_ANONYMOUS | MAP_SHARED, 0, 0);
352  if (!sigmon_sem_) {
353  perror("mmap error\n");
354  std::cout << "mmap error"<<std::endl;
355  }
356  else
357  sem_init(sigmon_sem_,true,0);
358  }
359  #endif
360 }
361 //___________here ends the *huge* constructor___________________________________
362 
363 
364 //______________________________________________________________________________
366 {
367  // no longer needed since the wrapper is a member of the class and one can rely on
368  // implicit destructor - to be revised - at any rate the most common exit path is via "kill"...
369  // if (evtProcessor_) delete evtProcessor_;
370  if(vulture_ != 0) delete vulture_;
371 }
372 
373 
375 // implementation of member functions
377 
378 
379 //______________________________________________________________________________
380 bool FUEventProcessor::configuring(toolbox::task::WorkLoop* wl)
381 {
382 // std::cout << "values " << ((nbSubProcesses_.value_!=0) ? 0x10 : 0) << " "
383 // << ((instance_.value_==0) ? 0x8 : 0) << " "
384 // << (hasServiceWebRegistry_.value_ ? 0x4 : 0) << " "
385 // << (hasModuleWebRegistry_.value_ ? 0x2 : 0) << " "
386 // << (hasPrescaleService_.value_ ? 0x1 : 0) <<std::endl;
387  unsigned short smap
388  = (datasetCounting_.value_ ? 0x20 : 0 )
389  + ((nbSubProcesses_.value_!=0) ? 0x10 : 0)
390  + (((instance_.value_%80)==0) ? 0x8 : 0) // have at least one legend per slice
391  + (hasServiceWebRegistry_.value_ ? 0x4 : 0)
392  + (hasModuleWebRegistry_.value_ ? 0x2 : 0)
393  + (hasPrescaleService_.value_ ? 0x1 : 0);
394  if(nbSubProcesses_.value_==0)
395  {
396  spMStates_.setSize(1);
397  spmStates_.setSize(1);
398  }
399  else
400  {
401  spMStates_.setSize(nbSubProcesses_.value_);
402  spmStates_.setSize(nbSubProcesses_.value_);
403  for(unsigned int i = 0; i < spMStates_.size(); i++)
404  {
406  spmStates_[i] = 0;
407  }
408  }
409  try {
410  LOG4CPLUS_INFO(getApplicationLogger(),"Start configuring ...");
411  std::string cfg = configString_.toString(); evtProcessor_.init(smap,cfg);
412  epInitialized_=true;
413  if(evtProcessor_)
414  {
415  //get ref of mwr
418  // moved to wrapper class
423  if(cpustat_) {delete cpustat_; cpustat_=0;}
425  nbSubProcesses_.value_,
426  instance_.value_,
427  iDieUrl_.value_);
428  if(ratestat_) {delete ratestat_; ratestat_=0;}
429  ratestat_ = new RateStat(iDieUrl_.value_);
430  if(iDieStatisticsGathering_.value_)
431  {
432  try
433  {
435  xdata::Serializable *legenda = scalersLegendaInfoSpace_->find("scalersLegenda");
436  if(legenda !=0)
437  {
438  std::string slegenda = ((xdata::String*)legenda)->value_;
439  ratestat_->sendLegenda(slegenda);
440  }
441  if (sorRef_ && datasetCounting_.value_)
442  {
443  xdata::String dsLegenda = sorRef_->getDatasetCSV();
444  if (dsLegenda.value_.size())
445  ratestat_->sendAuxLegenda(dsLegenda);
446  }
447  }
448  catch(evf::Exception &e)
449  {
450  LOG4CPLUS_INFO(getApplicationLogger(),"coud not send legenda"
451  << e.what());
452  }
453  catch (xcept::Exception& e) {
454  LOG4CPLUS_ERROR(getApplicationLogger(),"Failed to get or send legenda."
455  << e.what());
456  }
457  }
458 
459  fsm_.fireEvent("ConfigureDone",this);
460  LOG4CPLUS_INFO(getApplicationLogger(),"Finished configuring!");
461  localLog("-I- Configuration completed");
462 
463  }
464  }
465  catch (xcept::Exception &e) {
466  reasonForFailedState_ = "configuring FAILED: " + (std::string)e.what();
469  }
470  catch(cms::Exception &e) {
474  }
475  catch(std::exception &e) {
476  reasonForFailedState_ = e.what();
479  }
480  catch(...) {
481  fsm_.fireFailed("Unknown Exception",this);
482  }
483 
484  if(vulture_!=0 && vp_ == 0) vp_ = vulture_->makeProcess();
485 
486  return false;
487 }
488 
489 
490 
491 
492 //______________________________________________________________________________
493 bool FUEventProcessor::enabling(toolbox::task::WorkLoop* wl)
494 {
495  nbTotalDQM_ = 0;
496  scalersUpdates_ = 0;
497  idleProcStats_ = 0;
498  allProcStats_ = 0;
499 // std::cout << "values " << ((nbSubProcesses_.value_!=0) ? 0x10 : 0) << " "
500 // << ((instance_.value_==0) ? 0x8 : 0) << " "
501 // << (hasServiceWebRegistry_.value_ ? 0x4 : 0) << " "
502 // << (hasModuleWebRegistry_.value_ ? 0x2 : 0) << " "
503 // << (hasPrescaleService_.value_ ? 0x1 : 0) <<std::endl;
504  unsigned short smap
505  = (datasetCounting_.value_ ? 0x20 : 0 )
506  + ((nbSubProcesses_.value_!=0) ? 0x10 : 0)
507  + (((instance_.value_%80)==0) ? 0x8 : 0) // have at least one legend per slice
508  + (hasServiceWebRegistry_.value_ ? 0x4 : 0)
509  + (hasModuleWebRegistry_.value_ ? 0x2 : 0)
510  + (hasPrescaleService_.value_ ? 0x1 : 0);
511 
512  LOG4CPLUS_INFO(getApplicationLogger(),"Start enabling...");
513 
514  //reset core limit size
516  setrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
518  crashesThisRun_=0;
519  //recreate signal monitor sem
520  sem_destroy(sigmon_sem_);
521  sem_init(sigmon_sem_,true,0);
522 
523  if(!epInitialized_){
525  }
526  std::string cfg = configString_.toString(); evtProcessor_.init(smap,cfg);
527 
530 
531  if(!epInitialized_){
534  if(cpustat_) {delete cpustat_; cpustat_=0;}
536  nbSubProcesses_.value_,
537  instance_.value_,
538  iDieUrl_.value_);
539  if(ratestat_) {delete ratestat_; ratestat_=0;}
540  ratestat_ = new RateStat(iDieUrl_.value_);
541  if(iDieStatisticsGathering_.value_)
542  {
543  try
544  {
546  xdata::Serializable *legenda = scalersLegendaInfoSpace_->find("scalersLegenda");
547  if(legenda !=0)
548  {
549  std::string slegenda = ((xdata::String*)legenda)->value_;
550  ratestat_->sendLegenda(slegenda);
551  }
552  if (sorRef_ && datasetCounting_.value_)
553  {
554  xdata::String dsLegenda = sorRef_->getDatasetCSV();
555  if (dsLegenda.value_.size())
556  ratestat_->sendAuxLegenda(dsLegenda);
557  }
558  }
559  catch(evf::Exception &e)
560  {
561  LOG4CPLUS_INFO(getApplicationLogger(),"could not send legenda"
562  << e.what());
563  }
564  catch (xcept::Exception& e) {
565  LOG4CPLUS_ERROR(getApplicationLogger(),"Failed to get or send legenda."
566  << e.what());
567  }
568  }
569  epInitialized_ = true;
570  }
571  configuration_ = evtProcessor_.configuration(); // get it again after init has been carried out...
573  //classic appl will return here
574  if(nbSubProcesses_.value_==0) return enableClassic();
575  //protect manipulation of subprocess array
576  pthread_mutex_lock(&start_lock_);
577  pthread_mutex_lock(&pickup_lock_);
578  subs_.clear();
579  subs_.resize(nbSubProcesses_.value_); // this should not be necessary
580  pid_t retval = -1;
581 
582  for(unsigned int i=0; i<nbSubProcesses_.value_; i++)
583  {
584  subs_[i]=SubProcess(i,retval); //this will replace all the scattered variables
585  }
586  pthread_mutex_unlock(&pickup_lock_);
587 
588  pthread_mutex_unlock(&start_lock_);
589 
590  //set expected number of child EP's for the Init message(s) sent to the SM
591  try {
592  if (sorRef_) {
593  unsigned int nbExpectedEPWriters = nbSubProcesses_.value_;
594  if (nbExpectedEPWriters==0) nbExpectedEPWriters=1;//master instance processing
595  std::vector<edm::FUShmOutputModule *> shmOutputs = sorRef_->getShmOutputModules();
596  for (unsigned int i=0;i<shmOutputs.size();i++) {
597  shmOutputs[i]->setNExpectedEPs(nbExpectedEPWriters);
598  }
599  }
600  }
601  catch (...)
602  {
603  LOG4CPLUS_ERROR(getApplicationLogger(),"Thrown Exception while setting nExpectedEPs in shmOutputs");
604  }
605 
606  //use new method if configured
607  edm_init_done_=true;
608  if (forkInEDM_.value_) {
609  edm_init_done_=false;
610  enableForkInEDM();
611  }
612  else
613  for(unsigned int i=0; i<nbSubProcesses_.value_; i++)
614  {
615  retval = subs_[i].forkNew();
616  if(retval==0)
617  {
618  myProcess_ = &subs_[i];
619  // dirty hack: delete/recreate global binary semaphore for later use in child
621  toolbox::mem::_s_mutex_ptr_ = new toolbox::BSem(toolbox::BSem::FULL,true);
622  int retval = pthread_mutex_destroy(&stop_lock_);
623  if(retval != 0) perror("error");
624  retval = pthread_mutex_init(&stop_lock_,0);
625  if(retval != 0) perror("error");
627 
628  return enableMPEPSlave();
629  // the loop is broken in the child
630  }
631  }
632 
633  if (forkInEDM_.value_) {
634 
636  while (!edm_init_done_) {
637  usleep(10000);
638  st = evtProcessor_->getState();
640  }
641  st = evtProcessor_->getState();
642  //error handling: EP must fork during sRunning
644  reasonForFailedState_ = std::string("Master edm::EventProcessor in state ") + evtProcessor_->stateName(st);
647  return false;
648  }
649 
650  sleep(1);
652  startSignalMonitorWorkLoop();//only with new forking
653  vp_ = vulture_->start(iDieUrl_.value_,runNumber_.value_);
654 
655  //enable after we are done with conditions loading and forking
656  LOG4CPLUS_INFO(getApplicationLogger(),"Finished enabling!");
657  fsm_.fireEvent("EnableDone",this);
658  localLog("-I- Start completed");
659  return false;
660  }
661 
663  vp_ = vulture_->start(iDieUrl_.value_,runNumber_.value_);
664  LOG4CPLUS_INFO(getApplicationLogger(),"Finished enabling!");
665  fsm_.fireEvent("EnableDone",this);
666  localLog("-I- Start completed");
667  return false;
668 }
669 
671  //taking care that EP in master is in stoppable state
672  if (forkInfoObj_) {
673 
674  int count = 30;
675  bool waitedForEDM=false;
676  while (!edm_init_done_ && count) {
677  ::sleep(1);
678  if (count%5==0)
679  LOG4CPLUS_WARN(log_,"MASTER EP: Stopping while EP busy in beginRun. waiting " <<count<< "sec");
680  count--;
681  waitedForEDM=true;
682  }
683  //sleep a few more seconds it was early stop
684  if (waitedForEDM) sleep(5);
685 
686  //if (count==0) fsm_.fireFailed("failed to stop Master EP",this);
687 
689  return true;//we are already done
690 
691  forkInfoObj_->lock();
693  sem_post(forkInfoObj_->control_sem_);
694  forkInfoObj_->unlock();
695 
696  count = 30;
697 
699  while(count--) {
700  st = evtProcessor_->getState();
701  if (st==edm::event_processor::sRunning) ::usleep(100000);
703  break;
704  }
705  else {
706  std::ostringstream ost;
707  ost << "Master edm::EventProcessor is in state "<< evtProcessor_->stateName(st) << " while stopping";
708  LOG4CPLUS_ERROR(getApplicationLogger(),ost.str());
709  fsm_.fireFailed(ost.str(),this);
710  return false;
711  }
712  if (count%5==0 && st==edm::event_processor::sRunning && !forkInfoObj_->receivedStop_) {
713  forkInfoObj_->lock();
715  sem_post(forkInfoObj_->control_sem_);
716  forkInfoObj_->unlock();
717  LOG4CPLUS_WARN(getApplicationLogger(),
718  "Master edm::EventProcessor still running after "<< (30-count-1) << " seconds. \"sem_post\" was executed again" );
719  }
720  }
721  if (count<0) {
722  std::ostringstream ost;
724  ost << "Timeout waiting for Master edm::EventProcessor to go stopping state "
725  << evtProcessor_->stateName(st) << ": input source did not receive stop signal!";
726  else
727  ost << "Timeout waiting for Master edm::EventProcessor to go stopping state "<<evtProcessor_->stateName(st);
728  LOG4CPLUS_ERROR(getApplicationLogger(),ost.str());
729  fsm_.fireFailed(ost.str(),this);
730  return false;
731  }
732  }
733  return true;
734 }
735 
736 //______________________________________________________________________________
737 bool FUEventProcessor::stopping(toolbox::task::WorkLoop* wl)
738 {
739  setrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
740  if(nbSubProcesses_.value_!=0) {
742  if (forkInEDM_.value_) {
743  //only in new forking for now
744  sem_post(sigmon_sem_);
745  if (!doEndRunInEDM())
746  return false;
747  }
748  }
749  vulture_->stop();
750 
751  if (forkInEDM_.value_) {
752  //shared memory was already disconnected in master
753  bool tmpHasShMem_=hasShMem_;
754  hasShMem_=false;
755  stopClassic();
756  hasShMem_=tmpHasShMem_;
757  return false;
758  }
759  stopClassic();
760  return false;
761 }
762 
763 
764 //______________________________________________________________________________
765 bool FUEventProcessor::halting(toolbox::task::WorkLoop* wl)
766 {
767  LOG4CPLUS_INFO(getApplicationLogger(),"Start halting ...");
768  setrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
769  if(nbSubProcesses_.value_!=0) {
771  if (forkInEDM_.value_) {
772  sem_post(sigmon_sem_);
773  if (!doEndRunInEDM())
774  return false;
775 
776  }
777  }
778  try{
780  //cleanup forking variables
781  if (forkInfoObj_) {
782  delete forkInfoObj_;
783  forkInfoObj_=0;
784  }
785  }
786  catch (evf::Exception &e) {
787  reasonForFailedState_ = "halting FAILED: " + (std::string)e.what();
790  }
791  // if(hasShMem_) detachDqmFromShm();
792 
793  LOG4CPLUS_INFO(getApplicationLogger(),"Finished halting!");
794  fsm_.fireEvent("HaltDone",this);
795 
796  localLog("-I- Halt completed");
797  return false;
798 }
799 
800 
801 //______________________________________________________________________________
802 xoap::MessageReference FUEventProcessor::fsmCallback(xoap::MessageReference msg)
804 {
805  return fsm_.commandCallback(msg);
806 }
807 
808 
809 //______________________________________________________________________________
811 {
812 
813  if (e.type()=="ItemChangedEvent" && fsm_.stateName()->toString()!="Halted") {
814  std::string item = dynamic_cast<xdata::ItemChangedEvent&>(e).itemName();
815 
816  if ( item == "parameterSet") {
817  LOG4CPLUS_WARN(getApplicationLogger(),
818  "HLT Menu changed, force reinitialization of EventProcessor");
819  epInitialized_ = false;
820  }
821  if ( item == "outputEnabled") {
822  if(outprev_ != outPut_) {
823  LOG4CPLUS_WARN(getApplicationLogger(),
824  (outprev_ ? "Disabling " : "Enabling ")<<"global output");
826  outprev_ = outPut_;
827  }
828  }
829  if (item == "globalInputPrescale") {
830  LOG4CPLUS_WARN(getApplicationLogger(),
831  "Setting global input prescale has no effect "
832  <<"in this version of the code");
833  }
834  if ( item == "globalOutputPrescale") {
835  LOG4CPLUS_WARN(getApplicationLogger(),
836  "Setting global output prescale has no effect "
837  <<"in this version of the code");
838  }
839  }
840 
841 }
842 
843 //______________________________________________________________________________
845 {
846  for (unsigned int i=0;i<subs_.size();i++)
847  {
848  if (i!=0) *out << ",";
849  *out << subs_[i].pid();
850  }
851 }
852 //______________________________________________________________________________
854 {
855  using namespace cgicc;
856  pid_t pid = 0;
857  std::ostringstream ost;
858  ost << "&";
859 
860  Cgicc cgi(in);
861  internal::MyCgi *mycgi = (internal::MyCgi*)in;
862  for(std::map<std::string, std::string, std::less<std::string> >::iterator mit =
863  mycgi->getEnvironment().begin();
864  mit != mycgi->getEnvironment().end(); mit++)
865  ost << mit->first << "%" << mit->second << ";";
866  std::vector<FormEntry> els = cgi.getElements() ;
867  std::vector<FormEntry> el1;
868  cgi.getElement("method",el1);
869  std::vector<FormEntry> el2;
870  cgi.getElement("process",el2);
871  if(el1.size()!=0) {
872  std::string meth = el1[0].getValue();
873  if(el2.size()!=0) {
874  unsigned int i = 0;
875  std::string mod = el2[0].getValue();
876  pid = atoi(mod.c_str()); // get the process id to be polled
877  for(; i < subs_.size(); i++)
878  if(subs_[i].pid()==pid) break;
879  if(i>=subs_.size()){ //process was not found, let the browser know
880  *out << "ERROR 404 : Process " << pid << " Not Found !" << std::endl;
881  return;
882  }
883  if(subs_[i].alive() != 1){
884  *out << "ERROR 405 : Process " << pid << " Not Alive !" << std::endl;
885  return;
886  }
887  MsgBuf msg1(meth.length()+ost.str().length()+1,MSQM_MESSAGE_TYPE_WEB);
888  strncpy(msg1->mtext,meth.c_str(),meth.length());
889  strncpy(msg1->mtext+meth.length(),ost.str().c_str(),ost.str().length());
890  subs_[i].post(msg1,true);
891  unsigned int keep_supersleep_original_value = superSleepSec_.value_;
892  superSleepSec_.value_=10*keep_supersleep_original_value;
894  bool done = false;
895  std::vector<char *>pieces;
896  while(!done){
897  unsigned long retval1 = subs_[i].rcvNonBlocking(msg,true);
898  if(retval1 == MSGQ_MESSAGE_TYPE_RANGE){
899  ::sleep(1);
900  continue;
901  }
902  unsigned int nbytes = atoi(msg->mtext);
903  if(nbytes < MAX_PIPE_BUFFER_SIZE) done = true; // this will break the while loop
904  char *buf= new char[nbytes];
905  ssize_t retval = read(anonymousPipe_[PIPE_READ],buf,nbytes);
906  if(retval!=nbytes) std::cout
907  << "CAREFUL HERE, read less bytes than expected from pipe in subWeb" << std::endl;
908  pieces.push_back(buf);
909  }
910  superSleepSec_.value_=keep_supersleep_original_value;
911  for(unsigned int j = 0; j < pieces.size(); j++){
912  *out<<pieces[j]; // chain the buffers into the output strstream
913  delete[] pieces[j]; //make sure to release all buffers used for reading the pipe
914  }
915  }
916  }
917 }
918 
919 
920 //______________________________________________________________________________
923 {
924 
925 
926  *out << "<!DOCTYPE HTML PUBLIC \"-//W3C//DTD HTML 4.0 Transitional//EN\">"
927  << "<html><head><title>" << getApplicationDescriptor()->getClassName() << (nbSubProcesses_.value_ > 0 ? "MP " : " ")
928  << getApplicationDescriptor()->getInstance() << "</title>"
929  << "<meta http-equiv=\"REFRESH\" content=\"0;url=/evf/html/defaultBasePage.html\">"
930  << "</head></html>";
931 }
932 
933 
934 //______________________________________________________________________________
935 
936 
939 {
940 
941  std::string urn = getApplicationDescriptor()->getURN();
942 
943  *out << "<!-- base href=\"/" << urn
944  << "\"> -->" << std::endl;
945  *out << "<html>" << std::endl;
946  *out << "<head>" << std::endl;
947  *out << "<link type=\"text/css\" rel=\"stylesheet\"";
948  *out << " href=\"/evf/html/styles.css\"/>" << std::endl;
949  *out << "<title>" << getApplicationDescriptor()->getClassName()
950  << getApplicationDescriptor()->getInstance()
951  << " MAIN</title>" << std::endl;
952  *out << "</head>" << std::endl;
953  *out << "<body>" << std::endl;
954  *out << "<table border=\"0\" width=\"100%\">" << std::endl;
955  *out << "<tr>" << std::endl;
956  *out << " <td align=\"left\">" << std::endl;
957  *out << " <img" << std::endl;
958  *out << " align=\"middle\"" << std::endl;
959  *out << " src=\"/evf/images/spoticon.jpg\"" << std::endl;
960  *out << " alt=\"main\"" << std::endl;
961  *out << " width=\"64\"" << std::endl;
962  *out << " height=\"64\"" << std::endl;
963  *out << " border=\"\"/>" << std::endl;
964  *out << " <b>" << std::endl;
965  *out << getApplicationDescriptor()->getClassName()
966  << getApplicationDescriptor()->getInstance() << std::endl;
967  *out << " " << fsm_.stateName()->toString() << std::endl;
968  *out << " </b>" << std::endl;
969  *out << " </td>" << std::endl;
970  *out << " <td width=\"32\">" << std::endl;
971  *out << " <a href=\"/urn:xdaq-application:lid=3\">" << std::endl;
972  *out << " <img" << std::endl;
973  *out << " align=\"middle\"" << std::endl;
974  *out << " src=\"/hyperdaq/images/HyperDAQ.jpg\"" << std::endl;
975  *out << " alt=\"HyperDAQ\"" << std::endl;
976  *out << " width=\"32\"" << std::endl;
977  *out << " height=\"32\"" << std::endl;
978  *out << " border=\"\"/>" << std::endl;
979  *out << " </a>" << std::endl;
980  *out << " </td>" << std::endl;
981  *out << " <td width=\"32\">" << std::endl;
982  *out << " </td>" << std::endl;
983  *out << " <td width=\"32\">" << std::endl;
984  *out << " <a href=\"/" << urn << "/\">" << std::endl;
985  *out << " <img" << std::endl;
986  *out << " align=\"middle\"" << std::endl;
987  *out << " src=\"/evf/images/epicon.jpg\"" << std::endl;
988  *out << " alt=\"main\"" << std::endl;
989  *out << " width=\"32\"" << std::endl;
990  *out << " height=\"32\"" << std::endl;
991  *out << " border=\"\"/>" << std::endl;
992  *out << " </a>" << std::endl;
993  *out << " </td>" << std::endl;
994  *out << "</tr>" << std::endl;
995  *out << "</table>" << std::endl;
996 
997  *out << "<hr/>" << std::endl;
998 
999  std::ostringstream ost;
1000  if(myProcess_)
1001  ost << "/SubWeb?process=" << getpid() << "&method=moduleWeb&";
1002  else
1003  ost << "/moduleWeb?";
1004  urn += ost.str();
1005  if(evtProcessor_ && (myProcess_ || nbSubProcesses_.value_==0))
1007  else if(evtProcessor_)
1009  else
1010  *out << "<td>HLT Unconfigured</td>" << std::endl;
1011  *out << "</table>" << std::endl;
1012 
1013  *out << "<br><textarea rows=" << 10 << " cols=80 scroll=yes>" << std::endl;
1014  *out << configuration_ << std::endl;
1015  *out << "</textarea><P>" << std::endl;
1016 
1017  *out << "</body>" << std::endl;
1018  *out << "</html>" << std::endl;
1019 
1020 
1021 }
1024 {
1025 
1026  out->getHTTPResponseHeader().addHeader( "Content-Type",
1027  "application/octet-stream" );
1028  out->getHTTPResponseHeader().addHeader( "Content-Transfer-Encoding",
1029  "binary" );
1030  if(evtProcessor_ != 0){
1032  }
1033 }
1034 
1037 {
1038 
1039  if(evtProcessor_ != 0){
1040  xdata::Serializable *legenda = scalersLegendaInfoSpace_->find("scalersLegenda");
1041  if(legenda !=0){
1042  std::string slegenda = ((xdata::String*)legenda)->value_;
1043  *out << slegenda << std::endl;
1044  }
1045  }
1046 }
1047 
1048 
1050 {
1051  std::string errmsg;
1052  try {
1055  edm::Service<FUShmDQMOutputService>()->setAttachToShm();
1056  }
1057  catch (cms::Exception& e) {
1058  errmsg = "Failed to set to attach DQM service to shared memory: " + (std::string)e.what();
1059  }
1060  if (!errmsg.empty()) XCEPT_RAISE(evf::Exception,errmsg);
1061 }
1062 
1063 
1065 {
1066  std::string errmsg;
1067  bool success = false;
1068  try {
1071  success = edm::Service<FUShmDQMOutputService>()->attachToShm();
1072  if (!success) errmsg = "Failed to attach DQM service to shared memory";
1073  }
1074  catch (cms::Exception& e) {
1075  errmsg = "Failed to attach DQM service to shared memory: " + (std::string)e.what();
1076  }
1077  if (!errmsg.empty()) XCEPT_RAISE(evf::Exception,errmsg);
1078 }
1079 
1080 
1081 
1083 {
1084  std::string errmsg;
1085  bool success = false;
1086  try {
1089  success = edm::Service<FUShmDQMOutputService>()->detachFromShm();
1090  if (!success) errmsg = "Failed to detach DQM service from shared memory";
1091  }
1092  catch (cms::Exception& e) {
1093  errmsg = "Failed to detach DQM service from shared memory: " + (std::string)e.what();
1094  }
1095  if (!errmsg.empty()) XCEPT_RAISE(evf::Exception,errmsg);
1096 }
1097 
1098 
1100 {
1101  std::ostringstream oss;
1102  if(logWrap_)
1103  {
1104  for(unsigned int i = logRingIndex_; i < logRing_.size(); i++)
1105  oss << logRing_[i] << std::endl;
1106  for(unsigned int i = 0; i < logRingIndex_; i++)
1107  oss << logRing_[i] << std::endl;
1108  }
1109  else
1110  for(unsigned int i = logRingIndex_; i < logRing_.size(); i++)
1111  oss << logRing_[i] << std::endl;
1112 
1113  return oss.str();
1114 }
1115 
1117 {
1118  timeval tv;
1119 
1120  gettimeofday(&tv,0);
1121  tm *uptm = localtime(&tv.tv_sec);
1122  char datestring[256];
1123  strftime(datestring, sizeof(datestring),"%c", uptm);
1124 
1125  if(logRingIndex_ == 0){logWrap_ = true; logRingIndex_ = logRingSize_;}
1126  logRingIndex_--;
1127  std::ostringstream timestamp;
1128  timestamp << " at " << datestring;
1129  m += timestamp.str();
1131 }
1132 
1134 {
1135  try {
1137  toolbox::task::getWorkLoopFactory()->getWorkLoop("Supervisor",
1138  "waiting");
1139  if (!wlSupervising_->isActive()) wlSupervising_->activate();
1140  asSupervisor_ = toolbox::task::bind(this,&FUEventProcessor::supervisor,
1141  "Supervisor");
1142  wlSupervising_->submit(asSupervisor_);
1143  supervising_ = true;
1144  }
1145  catch (xcept::Exception& e) {
1146  std::string msg = "Failed to start workloop 'Supervisor'.";
1147  XCEPT_RETHROW(evf::Exception,msg,e);
1148  }
1149 }
1150 
1152 {
1153  try {
1154  wlReceiving_=
1155  toolbox::task::getWorkLoopFactory()->getWorkLoop("Receiving",
1156  "waiting");
1157  if (!wlReceiving_->isActive()) wlReceiving_->activate();
1158  asReceiveMsgAndExecute_ = toolbox::task::bind(this,&FUEventProcessor::receiving,
1159  "Receiving");
1161  receiving_ = true;
1162  }
1163  catch (xcept::Exception& e) {
1164  std::string msg = "Failed to start workloop 'Receiving'.";
1165  XCEPT_RETHROW(evf::Exception,msg,e);
1166  }
1167 }
1169 {
1170  try {
1172  toolbox::task::getWorkLoopFactory()->getWorkLoop("ReceivingM",
1173  "waiting");
1174  if (!wlReceivingMonitor_->isActive())
1175  wlReceivingMonitor_->activate();
1177  toolbox::task::bind(this,&FUEventProcessor::receivingAndMonitor,
1178  "ReceivingM");
1180  receivingM_ = true;
1181  }
1182  catch (xcept::Exception& e) {
1183  std::string msg = "Failed to start workloop 'ReceivingM'.";
1184  XCEPT_RETHROW(evf::Exception,msg,e);
1185  }
1186 }
1187 
1188 bool FUEventProcessor::receiving(toolbox::task::WorkLoop *)
1189 {
1190  MsgBuf msg;
1191  try{
1192  myProcess_->rcvSlave(msg,false); //will receive only messages from Master
1193  if(msg->mtype==MSQM_MESSAGE_TYPE_RLI)
1194  {
1195  rlimit rl;
1196  getrlimit(RLIMIT_CORE,&rl);
1197  rl.rlim_cur=0;
1198  setrlimit(RLIMIT_CORE,&rl);
1200  }
1201  if (msg->mtype==MSQM_MESSAGE_TYPE_RLR)
1202  {
1203  //reset coresize limit
1204  setrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
1206  }
1207  if(msg->mtype==MSQM_MESSAGE_TYPE_STOP)
1208  {
1209  pthread_mutex_lock(&stop_lock_);
1210  try {
1211  fsm_.fireEvent("Stop",this); // need to set state in fsm first to allow stopDone transition
1212  }
1213  catch (...) {
1214  LOG4CPLUS_ERROR(getApplicationLogger(),"Failed to go to Stopping state in slave EP, pid "
1215  << getpid() << " The state on Stop event was not consistent");
1216  }
1217 
1218  try {
1219  stopClassic(); // call the normal sequence of stopping - as this is allowed to fail provisions must be made ...@@@EM
1220  }
1221  catch (...) {
1222  LOG4CPLUS_ERROR(getApplicationLogger(),"Slave EP 'receiving' workloop: exception " << getpid());
1223  }
1224 
1225  //destroy MessageService thread before exit
1226  try{
1227  messageServicePresence_.reset();
1228  }
1229  catch(...) {
1230  LOG4CPLUS_ERROR(getApplicationLogger(),"SLAVE:Unable to destroy MessageServicePresence. pid:" << getpid() );
1231  }
1232 
1234  myProcess_->postSlave(msg1,false);
1235  pthread_mutex_unlock(&stop_lock_);
1236  fclose(stdout);
1237  fclose(stderr);
1238  _exit(EXIT_SUCCESS);
1239  }
1240  if(msg->mtype==MSQM_MESSAGE_TYPE_FSTOP)
1241  _exit(EXIT_SUCCESS);
1242  }
1243  catch(evf::Exception &e){
1244  LOG4CPLUS_ERROR(getApplicationLogger(),"Slave EP pid:" << getpid() << " receiving WorkLoop exception: "<<e.what());
1245  }
1246  return true;
1247 }
1248 
1249 bool FUEventProcessor::supervisor(toolbox::task::WorkLoop *)
1250 {
1251  pthread_mutex_lock(&stop_lock_);
1252  if(subs_.size()!=nbSubProcesses_.value_)
1253  {
1254  pthread_mutex_lock(&pickup_lock_);
1255  if(subs_.size()!=nbSubProcesses_.value_) {
1256  subs_.resize(nbSubProcesses_.value_);
1257  spMStates_.resize(nbSubProcesses_.value_);
1258  spmStates_.resize(nbSubProcesses_.value_);
1259  for(unsigned int i = 0; i < spMStates_.size(); i++)
1260  {
1262  spmStates_[i] = 0;
1263  }
1264  }
1265  pthread_mutex_unlock(&pickup_lock_);
1266  }
1267  bool running = fsm_.stateName()->toString()=="Enabled";
1268  bool stopping = fsm_.stateName()->toString()=="stopping";
1269  for(unsigned int i = 0; i < subs_.size(); i++)
1270  {
1271  if(subs_[i].alive()==-1000) continue;
1272  int sl;
1273  pid_t sub_pid = subs_[i].pid();
1274  pid_t killedOrNot = waitpid(sub_pid,&sl,WNOHANG);
1275 
1276  if(killedOrNot && killedOrNot==sub_pid) {
1277  pthread_mutex_lock(&pickup_lock_);
1278  //check if out of range or recreated (enable can clear vector)
1279  if (i<subs_.size() && subs_[i].alive()!=-1000) {
1280  subs_[i].setStatus((WIFEXITED(sl) != 0 ? 0 : -1));
1281  std::ostringstream ost;
1282  if(subs_[i].alive()==0) ost << " process exited with status " << WEXITSTATUS(sl);
1283  else if(WIFSIGNALED(sl)!=0) {
1284  ost << " process terminated with signal " << WTERMSIG(sl);
1285  }
1286  else ost << " process stopped ";
1287  //report unexpected slave exit in stop
1288  //if (stopping && (WEXITSTATUS(sl)!=0 || WIFSIGNALED(sl)!=0)) {
1289  // LOG4CPLUS_WARN(getApplicationLogger(),ost.str() << ", slave pid:"<<getpid());
1290  //}
1291  subs_[i].countdown()=slaveRestartDelaySecs_.value_;
1292  subs_[i].setReasonForFailed(ost.str());
1294  spmStates_[i] = 0;
1295  std::ostringstream ost1;
1296  ost1 << "-E- Slave " << subs_[i].pid() << ost.str();
1297  localLog(ost1.str());
1298  if(!autoRestartSlaves_.value_) subs_[i].disconnect();
1299  }
1300  pthread_mutex_unlock(&pickup_lock_);
1301  }
1302  }
1303  pthread_mutex_unlock(&stop_lock_);
1304  if(stopping) return true; // if in stopping we are done
1305 
1306  // check if we need to reset core dumps (15 min after last one)
1307  if (running && rlimit_coresize_changed_) {
1308  timeval newtv;
1309  gettimeofday(&newtv,0);
1310  int delta = newtv.tv_sec-lastCrashTime_.tv_sec;
1311  if (delta>60*15) {
1312  std::ostringstream ostr;
1313  ostr << " No more slave EP crashes on this machine in last 15 min. resetting core size limits";
1314  std::cout << ostr.str() << std::endl;
1315  LOG4CPLUS_INFO(getApplicationLogger(),ostr.str());
1316  setrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
1317  MsgBuf master_message_rlr_(NUMERIC_MESSAGE_SIZE,MSQM_MESSAGE_TYPE_RLR);
1318  for (unsigned int i = 0; i < subs_.size(); i++) {
1319  try {
1320  if (subs_[i].alive())
1321  subs_[i].post(master_message_rlr_,false);
1322  }
1323  catch (...) {}
1324  }
1326  crashesThisRun_=0;
1327  }
1328  }
1329 
1330  if(running && edm_init_done_)
1331  {
1332  // if enabled, this loop will periodically check if dead slaves countdown has expired and restart them
1333  // this is only active while running, hence, the stop lock is acquired and only released at end of loop
1334  if(autoRestartSlaves_.value_){
1335  pthread_mutex_lock(&stop_lock_); //lockout slave killing at stop while you check for restarts
1336  for(unsigned int i = 0; i < subs_.size(); i++)
1337  {
1338  if(subs_[i].alive() != 1){
1339  if(subs_[i].countdown() == 0)
1340  {
1341  if(subs_[i].restartCount()>2){
1342  LOG4CPLUS_WARN(getApplicationLogger()," Not restarting subprocess in slot " << i
1343  << " - maximum restart count reached");
1344  std::ostringstream ost1;
1345  ost1 << "-W- Dead Process in slot " << i << " reached maximum restart count";
1346  localLog(ost1.str());
1347  subs_[i].countdown()--;
1348  XCEPT_DECLARE(evf::Exception,
1349  sentinelException, ost1.str());
1350  notifyQualified("error",sentinelException);
1351  subs_[i].disconnect();
1352  continue;
1353  }
1354  subs_[i].restartCount()++;
1355  if (forkInEDM_.value_) {
1357  }
1358  else {
1359  pid_t rr = subs_[i].forkNew();
1360  if(rr==0)
1361  {
1362  myProcess_=&subs_[i];
1363  scalersUpdates_ = 0;
1364  int retval = pthread_mutex_destroy(&stop_lock_);
1365  if(retval != 0) perror("error");
1366  retval = pthread_mutex_init(&stop_lock_,0);
1367  if(retval != 0) perror("error");
1369  fsm_.fireEvent("Stop",this); // need to set state in fsm first to allow stopDone transition
1370  fsm_.fireEvent("StopDone",this); // need to set state in fsm first to allow stopDone transition
1371  fsm_.fireEvent("Enable",this); // need to set state in fsm first to allow stopDone transition
1372  try{
1373  xdata::Serializable *lsid = applicationInfoSpace_->find("lumiSectionIndex");
1374  if(lsid) {
1375  ((xdata::UnsignedInteger32*)(lsid))->value_--; // need to reset to value before end of ls in which process died
1376  }
1377  }
1378  catch(...){
1379  std::cout << "trouble with lsindex during restart" << std::endl;
1380  }
1381  try{
1382  xdata::Serializable *lstb = applicationInfoSpace_->find("lsToBeRecovered");
1383  if(lstb) {
1384  ((xdata::Boolean*)(lstb))->value_ = false; // do not issue eol/bol for all Ls when restarting
1385  }
1386  }
1387  catch(...){
1388  std::cout << "trouble with resetting flag for eol recovery " << std::endl;
1389  }
1390 
1393  enableMPEPSlave();
1394  return false; // exit the supervisor loop immediately in the child !!!
1395  }
1396  else
1397  {
1398  std::ostringstream ost1;
1399  ost1 << "-I- New Process " << rr << " forked for slot " << i;
1400  localLog(ost1.str());
1401  }
1402  }
1403  }
1404  if(subs_[i].countdown()>=0) subs_[i].countdown()--;
1405  }
1406  }
1407  pthread_mutex_unlock(&stop_lock_);
1408  } // finished handling replacement of dead slaves once they've been reaped
1409  }
1410  xdata::Serializable *lsid = 0;
1411  xdata::Serializable *psid = 0;
1412  xdata::Serializable *dqmp = 0;
1413  xdata::UnsignedInteger32 *dqm = 0;
1414 
1415 
1416 
1417  if(running && edm_init_done_){
1418  try{
1419  lsid = applicationInfoSpace_->find("lumiSectionIndex");
1420  psid = applicationInfoSpace_->find("prescaleSetIndex");
1421  nbProcessed = monitorInfoSpace_->find("nbProcessed");
1422  nbAccepted = monitorInfoSpace_->find("nbAccepted");
1423  dqmp = applicationInfoSpace_-> find("nbDqmUpdates");
1424  }
1426  LOG4CPLUS_INFO(getApplicationLogger(),"could not retrieve some data - " << e.what());
1427  }
1428 
1429  try{
1430  if(nbProcessed !=0 && nbAccepted !=0)
1431  {
1432  xdata::UnsignedInteger32*nbp = ((xdata::UnsignedInteger32*)nbProcessed);
1433  xdata::UnsignedInteger32*nba = ((xdata::UnsignedInteger32*)nbAccepted);
1434  xdata::UnsignedInteger32*ls = ((xdata::UnsignedInteger32*)lsid);
1435  xdata::UnsignedInteger32*ps = ((xdata::UnsignedInteger32*)psid);
1436  if(dqmp!=0)
1437  dqm = (xdata::UnsignedInteger32*)dqmp;
1438  if(dqm) dqm->value_ = 0;
1439  nbTotalDQM_ = 0;
1440  nbp->value_ = 0;
1441  nba->value_ = 0;
1442  nblive_ = 0;
1443  nbdead_ = 0;
1444  scalersUpdates_ = 0;
1445 
1446  for(unsigned int i = 0; i < subs_.size(); i++)
1447  {
1448  if(subs_[i].alive()>0)
1449  {
1450  nblive_++;
1451  try{
1452  subs_[i].post(master_message_prg_,true);
1453 
1454  unsigned long retval = subs_[i].rcvNonBlocking(master_message_prr_,true);
1455  if(retval == (unsigned long) master_message_prr_->mtype){
1456  prg* p = (struct prg*)(master_message_prr_->mtext);
1457  subs_[i].setParams(p);
1458  spMStates_[i] = p->Ms;
1459  spmStates_[i] = p->ms;
1460  cpustat_->addEntry(p->ms);
1461  if(!subs_[i].inInconsistentState() &&
1465  {
1466  std::ostringstream ost;
1467  ost << "edm::eventprocessor slot " << i << " process id "
1468  << subs_[i].pid() << " not in Running state : Mstate="
1469  << evtProcessor_.stateNameFromIndex(p->Ms) << " mstate="
1471  << " - Look into possible error messages from HLT process";
1472  LOG4CPLUS_WARN(getApplicationLogger(),ost.str());
1473  }
1474  nbp->value_ += subs_[i].params().nbp;
1475  nba->value_ += subs_[i].params().nba;
1476  if(dqm)dqm->value_ += p->dqm;
1477  nbTotalDQM_ += p->dqm;
1478  scalersUpdates_ += p->trp;
1479  if(p->ls > ls->value_) ls->value_ = p->ls;
1480  if(p->ps != ps->value_) ps->value_ = p->ps;
1481  }
1482  else{
1483  nbp->value_ += subs_[i].get_save_nbp();
1484  nba->value_ += subs_[i].get_save_nba();
1485  }
1486  }
1487  catch(evf::Exception &e){
1488  LOG4CPLUS_INFO(getApplicationLogger(),
1489  "could not send/receive msg on slot "
1490  << i << " - " << e.what());
1491  }
1492 
1493  }
1494  else
1495  {
1496  nbp->value_ += subs_[i].get_save_nbp();
1497  nba->value_ += subs_[i].get_save_nba();
1498  nbdead_++;
1499  }
1500  }
1501  if(nbp->value_>64){//have some slaves already processed more than one event ? (eventually make this == number of raw cells)
1502  for(unsigned int i = 0; i < subs_.size(); i++)
1503  {
1504  if(subs_[i].params().nbp == 0){ // a slave has processed 0 events
1505  // check that the process is not stuck
1506  if(subs_[i].alive()>0 && subs_[i].params().ms == 0) // the process is seen alive but in us=Invalid(0)
1507  {
1508  subs_[i].found_invalid();//increase the "found_invalid" counter
1509  if(subs_[i].nfound_invalid() > 60){ //wait x monitor cycles (~1 min a good time ?) before doing something about a stuck slave
1510  MsgBuf msg3(NUMERIC_MESSAGE_SIZE,MSQM_MESSAGE_TYPE_FSTOP); // send a force-stop signal
1511  subs_[i].post(msg3,false);
1512  std::ostringstream ost1;
1513  ost1 << "-W- Process in slot " << i << " Never reached the running state - forcestopping it";
1514  localLog(ost1.str());
1515  LOG4CPLUS_ERROR(getApplicationLogger(),ost1.str());
1516  XCEPT_DECLARE(evf::Exception,
1517  sentinelException, ost1.str());
1518  notifyQualified("error",sentinelException);
1519 
1520  }
1521  }
1522  }
1523  }
1524  }
1525  }
1526  }
1527  catch(std::exception &e){
1528  LOG4CPLUS_INFO(getApplicationLogger(),"std exception - " << e.what());
1529  }
1530  catch(...){
1531  LOG4CPLUS_INFO(getApplicationLogger(),"unknown exception ");
1532  }
1533  }
1534  else{
1535  for(unsigned int i = 0; i < subs_.size(); i++)
1536  {
1537  if(subs_[i].alive()==-1000)
1538  {
1540  spmStates_[i] = 0;
1541  }
1542  }
1543  }
1544  try{
1545  monitorInfoSpace_->lock();
1546  monitorInfoSpace_->fireItemGroupChanged(names_,0);
1547  monitorInfoSpace_->unlock();
1548  }
1550  {
1551  LOG4CPLUS_ERROR(log_, "Exception from fireItemGroupChanged: " << e.what());
1552  // localLog(e.what());
1553  }
1554  ::sleep(superSleepSec_.value_);
1555  return true;
1556 }
1557 
1559 {
1560  try {
1561  wlScalers_=
1562  toolbox::task::getWorkLoopFactory()->getWorkLoop("Scalers",
1563  "waiting");
1564  if (!wlScalers_->isActive()) wlScalers_->activate();
1565  asScalers_ = toolbox::task::bind(this,&FUEventProcessor::scalers,
1566  "Scalers");
1567 
1568  wlScalers_->submit(asScalers_);
1569  wlScalersActive_ = true;
1570  }
1571  catch (xcept::Exception& e) {
1572  std::string msg = "Failed to start workloop 'Scalers'.";
1573  XCEPT_RETHROW(evf::Exception,msg,e);
1574  }
1575 }
1576 
1577 //______________________________________________________________________________
1578 
1580 {
1581  try {
1582  wlSummarize_=
1583  toolbox::task::getWorkLoopFactory()->getWorkLoop("Summary",
1584  "waiting");
1585  if (!wlSummarize_->isActive()) wlSummarize_->activate();
1586 
1587  asSummarize_ = toolbox::task::bind(this,&FUEventProcessor::summarize,
1588  "Summary");
1589 
1590  wlSummarize_->submit(asSummarize_);
1591  wlSummarizeActive_ = true;
1592  }
1593  catch (xcept::Exception& e) {
1594  std::string msg = "Failed to start workloop 'Summarize'.";
1595  XCEPT_RETHROW(evf::Exception,msg,e);
1596  }
1597 }
1598 
1599 //______________________________________________________________________________
1600 
1601 bool FUEventProcessor::scalers(toolbox::task::WorkLoop* wl)
1602 {
1603  if(evtProcessor_)
1604  {
1605  if(!evtProcessor_.getTriggerReport(true)) {
1606  wlScalersActive_ = false;
1607  return false;
1608  }
1609  }
1610  else
1611  {
1612  std::cout << getpid()<< " Scalers workloop, bailing out, no evtProcessor " << std::endl;
1613  wlScalersActive_ = false;
1614  return false;
1615  }
1616  if(myProcess_)
1617  {
1618  // std::cout << getpid() << "going to post on control queue from scalers" << std::endl;
1620  if(ret!=0) std::cout << "scalers workloop, error posting to sqs_ " << errno << std::endl;
1621  scalersUpdates_++;
1622  }
1623  else
1625  return true;
1626 }
1627 
1628 //______________________________________________________________________________
1629 bool FUEventProcessor::summarize(toolbox::task::WorkLoop* wl)
1630 {
1632  bool atLeastOneProcessUpdatedSuccessfully = false;
1633  int msgCount = 0;
1634  for (unsigned int i = 0; i < subs_.size(); i++)
1635  {
1636  if(subs_[i].alive()>0)
1637  {
1638  int ret = 0;
1639  if(subs_[i].check_postponed_trigger_update(master_message_trr_,
1641  {
1642  ret = MSQS_MESSAGE_TYPE_TRR;
1643  std::cout << "using postponed report from slot " << i << " for ls " << evtProcessor_.getLumiSectionReferenceIndex() << std::endl;
1644  }
1645  else{
1646  bool insync = false;
1647  bool exception_caught = false;
1648  while(!insync){
1649  try{
1650  ret = subs_[i].rcv(master_message_trr_,false);
1651  }
1652  catch(evf::Exception &e)
1653  {
1654  std::cout << "exception in msgrcv on " << i
1655  << " " << subs_[i].alive() << " " << strerror(errno) << std::endl;
1656  exception_caught = true;
1657  break;
1658  //do nothing special
1659  }
1660  if(ret==MSQS_MESSAGE_TYPE_TRR) {
1663  insync = true;
1664  }
1665  }
1666  }
1667  if(exception_caught) continue;
1668  }
1669  msgCount++;
1670  if(ret==MSQS_MESSAGE_TYPE_TRR) {
1673  std::cout << "postpone handling of msg from slot " << i << " with Ls " << trp->lumiSection
1674  << " should be " << evtProcessor_.getLumiSectionReferenceIndex() << std::endl;
1675  subs_[i].add_postponed_trigger_update(master_message_trr_);
1676  }else{
1677  atLeastOneProcessUpdatedSuccessfully = true;
1679  }
1680  }
1681  else std::cout << "msgrcv returned error " << errno << std::endl;
1682  }
1683  }
1684  if(atLeastOneProcessUpdatedSuccessfully){
1685  nbSubProcessesReporting_.value_ = msgCount;
1690  }
1691  else{
1692  LOG4CPLUS_WARN(getApplicationLogger(),"Summarize loop: no process updated successfully - sleep 10 seconds before trying again");
1693  if(msgCount==0) evtProcessor_.withdrawLumiSectionIncrement();
1694  nbSubProcessesReporting_.value_ = 0;
1695  ::sleep(10);
1696  }
1697  if(fsm_.stateName()->toString()!="Enabled"){
1698  wlScalersActive_ = false;
1699  return false;
1700  }
1701  // cpustat_->printStat();
1702  if(iDieStatisticsGathering_.value_){
1703  try{
1704  unsigned long long idleTmp=idleProcStats_;
1705  unsigned long long allPSTmp=allProcStats_;
1707 
1709  timeval oldtime=lastProcReport_;
1710  gettimeofday(&lastProcReport_,0);
1711 
1712  if (allPSTmp!=0 && idleTmp!=0 && allProcStats_!=allPSTmp) {
1713  cpustat_->setCPUStat(1000 - ((idleProcStats_-idleTmp)*1000)/(allProcStats_-allPSTmp));
1714  int deltaTms=1000 * (lastProcReport_.tv_sec-oldtime.tv_sec)
1715  + (lastProcReport_.tv_usec-oldtime.tv_usec)/1000;
1716  cpustat_->setElapsed(deltaTms);
1717  }
1718  else {
1719  cpustat_->setCPUStat(0);
1720  cpustat_->setElapsed(0);
1721  }
1722 
1726  ratestat_->sendStat((unsigned char*)trsp,
1727  sizeof(TriggerReportStatic),
1729  }catch(evf::Exception &e){
1730  LOG4CPLUS_INFO(getApplicationLogger(),"coud not send statistics"
1731  << e.what());
1732  }
1733  }
1734  cpustat_->reset();
1735  return true;
1736 }
1737 
1738 
1739 
1740 bool FUEventProcessor::receivingAndMonitor(toolbox::task::WorkLoop *)
1741 {
1742  try{
1743  myProcess_->rcvSlave(slave_message_monitoring_,true); //will receive only messages from Master
1744  switch(slave_message_monitoring_->mtype)
1745  {
1746  case MSQM_MESSAGE_TYPE_MCS:
1747  {
1748  xgi::Input *in = 0;
1749  xgi::Output out;
1750  evtProcessor_.microState(in,&out);
1751  MsgBuf msg1(out.str().size(),MSQS_MESSAGE_TYPE_MCR);
1752  strncpy(msg1->mtext,out.str().c_str(),out.str().size());
1753  myProcess_->postSlave(msg1,true);
1754  break;
1755  }
1756 
1757  case MSQM_MESSAGE_TYPE_PRG:
1758  {
1759  xdata::Serializable *dqmp = 0;
1760  xdata::UnsignedInteger32 *dqm = 0;
1762  try{
1763  dqmp = applicationInfoSpace_-> find("nbDqmUpdates");
1764  } catch(xdata::exception::Exception e){}
1765  if(dqmp!=0)
1766  dqm = (xdata::UnsignedInteger32*)dqmp;
1767 
1768  // monitorInfoSpace_->lock();
1769  prg * data = (prg*)slave_message_prr_->mtext;
1770  data->ls = evtProcessor_.lsid_;
1772  data->ps = evtProcessor_.psid_;
1773  data->nbp = evtProcessor_->totalEvents();
1774  data->nba = evtProcessor_->totalEventsPassed();
1775  data->Ms = evtProcessor_.epMAltState_.value_;
1776  data->ms = evtProcessor_.epmAltState_.value_;
1777  if(dqm) data->dqm = dqm->value_; else data->dqm = 0;
1778  data->trp = scalersUpdates_;
1779  // monitorInfoSpace_->unlock();
1781  if(exitOnError_.value_)
1782  {
1783  // after each monitoring cycle check if we are in inconsistent state and exit if configured to do so
1784  // std::cout << getpid() << "receivingAndMonitor: trying to acquire stop lock " << std::endl;
1785  if(data->Ms == edm::event_processor::sError)
1786  {
1787  bool running = true;
1788  int count = 0;
1789  while(running){
1790  int retval = pthread_mutex_lock(&stop_lock_);
1791  if(retval != 0) perror("error");
1792  running = fsm_.stateName()->toString()=="Enabled";
1793  if(count>5) _exit(-1);
1794  pthread_mutex_unlock(&stop_lock_);
1795  if(running) {::sleep(1); count++;}
1796  }
1797  }
1798  }
1799  break;
1800  }
1801  case MSQM_MESSAGE_TYPE_WEB:
1802  {
1803  xgi::Input *in = 0;
1804  xgi::Output out;
1805  unsigned int bytesToSend = 0;
1807  std::string query = slave_message_monitoring_->mtext;
1808  size_t pos = query.find_first_of("&");
1809  std::string method;
1810  std::string args;
1811  if(pos!=std::string::npos)
1812  {
1813  method = query.substr(0,pos);
1814  args = query.substr(pos+1,query.length()-pos-1);
1815  }
1816  else
1817  method=query;
1818 
1819  if(method=="Spotlight")
1820  {
1821  spotlightWebPage(in,&out);
1822  }
1823  else if(method=="procStat")
1824  {
1825  procStat(in,&out);
1826  }
1827  else if(method=="moduleWeb")
1828  {
1829  internal::MyCgi mycgi;
1830  boost::char_separator<char> sep(";");
1831  boost::tokenizer<boost::char_separator<char> > tokens(args, sep);
1832  for (boost::tokenizer<boost::char_separator<char> >::iterator tok_iter = tokens.begin();
1833  tok_iter != tokens.end(); ++tok_iter){
1834  size_t pos = (*tok_iter).find_first_of("%");
1835  if(pos != std::string::npos){
1836  std::string first = (*tok_iter).substr(0 , pos);
1837  std::string second = (*tok_iter).substr(pos+1, (*tok_iter).length()-pos-1);
1838  mycgi.getEnvironment()[first]=second;
1839  }
1840  }
1841  moduleWeb(&mycgi,&out);
1842  }
1843  else if(method=="Default")
1844  {
1845  defaultWebPage(in,&out);
1846  }
1847  else
1848  {
1849  out << "Error 404!!!!!!!!" << std::endl;
1850  }
1851 
1852 
1853  bytesToSend = out.str().size();
1854  unsigned int cycle = 0;
1855  if(bytesToSend==0)
1856  {
1857  snprintf(msg1->mtext, NUMERIC_MESSAGE_SIZE, "%d", bytesToSend);
1858  myProcess_->postSlave(msg1,true);
1859  }
1860  while(bytesToSend !=0){
1861  unsigned int msgSize = bytesToSend>MAX_PIPE_BUFFER_SIZE ? MAX_PIPE_BUFFER_SIZE : bytesToSend;
1863  out.str().c_str()+MAX_PIPE_BUFFER_SIZE*cycle,
1864  msgSize);
1865  snprintf(msg1->mtext, NUMERIC_MESSAGE_SIZE, "%d", msgSize);
1866  myProcess_->postSlave(msg1,true);
1867  bytesToSend -= msgSize;
1868  cycle++;
1869  }
1870  break;
1871  }
1872  case MSQM_MESSAGE_TYPE_TRP:
1873  {
1874  break;
1875  }
1876  }
1877  }
1878  catch(evf::Exception &e){std::cout << "exception caught in recevingM: " << e.what() << std::endl;}
1879  return true;
1880 }
1881 
1883 {
1884  //todo rewind/check semaphore
1885  //start workloop
1886  try {
1888  toolbox::task::getWorkLoopFactory()->getWorkLoop("SignalMonitor",
1889  "waiting");
1890 
1891  if (!wlSignalMonitor_->isActive()) wlSignalMonitor_->activate();
1892  asSignalMonitor_ = toolbox::task::bind(this,&FUEventProcessor::sigmon,
1893  "SignalMonitor");
1895  signalMonitorActive_ = true;
1896  }
1897  catch (xcept::Exception& e) {
1898  std::string msg = "Failed to start workloop 'SignalMonitor'. (3)";
1899  std::cout << e.what() << std::endl;
1900  XCEPT_RETHROW(evf::Exception,msg,e);
1901  }
1902 }
1903 
1904 
1905 bool FUEventProcessor::sigmon(toolbox::task::WorkLoop* wl)
1906 {
1907  while (1) {
1908  sem_wait(sigmon_sem_);
1909  std::cout << " received signal notification from slave!"<< std::endl;
1910 
1911  //check if shutdown time
1912  bool running = fsm_.stateName()->toString()=="Enabled";
1913  bool stopping = fsm_.stateName()->toString()=="stopping";
1914  bool enabling = fsm_.stateName()->toString()=="enabling";
1915  if (!running && !enabling) {
1916  signalMonitorActive_ = false;
1917  return false;
1918  }
1919 
1920  crashesThisRun_++;
1921  gettimeofday(&lastCrashTime_,0);
1922 
1923  //set core size limit to 0 in master and slaves
1924  if (crashesThisRun_>=crashesToDump_.value_ && (running || stopping) && !rlimit_coresize_changed_) {
1925 
1926  rlimit rlold;
1927  getrlimit(RLIMIT_CORE,&rlold);
1928  rlimit rlnew = rlold;
1929  rlnew.rlim_cur=0;
1930  setrlimit(RLIMIT_CORE,&rlnew);
1932  MsgBuf master_message_rli_(NUMERIC_MESSAGE_SIZE,MSQM_MESSAGE_TYPE_RLI);
1933  //in case of frequent crashes, allow first slot to dump (until restart)
1934  unsigned int min=1;
1935  for (unsigned int i = min; i < subs_.size(); i++) {
1936  try {
1937  if (subs_[i].alive()) {
1938  subs_[i].post(master_message_rli_,false);
1939  }
1940  }
1941  catch (...) {}
1942  }
1943  std::ostringstream ostr;
1944  ostr << "Number of recent slave crashes reaches " << crashesThisRun_
1945  << ". Disabling core dumps for next 15 minutes in this FilterUnit";
1946  LOG4CPLUS_WARN(getApplicationLogger(),ostr.str());
1947  }
1948  }//end while loop
1949  signalMonitorActive_ = false;
1950  return false;
1951 }
1952 
1953 
1955 {
1956  try {
1957  if(hasShMem_) attachDqmToShm();
1958  int sc = 0;
1960  if(isRunNumberSetter_)
1962  else
1964 
1965  try{
1966  ::sleep(1);
1968  sc = evtProcessor_->statusAsync();
1969  }
1970  catch(cms::Exception &e) {
1974  return false;
1975  }
1976  catch(std::exception &e) {
1977  reasonForFailedState_ = e.what();
1980  return false;
1981  }
1982  catch(...) {
1983  reasonForFailedState_ = "Unknown Exception";
1986  return false;
1987  }
1988  if(sc != 0) {
1989  std::ostringstream oss;
1990  oss<<"EventProcessor::runAsync returned status code " << sc;
1991  reasonForFailedState_ = oss.str();
1994  return false;
1995  }
1996  }
1997  catch (xcept::Exception &e) {
1998  reasonForFailedState_ = "enabling FAILED: " + (std::string)e.what();
2001  return false;
2002  }
2003  try{
2004  fsm_.fireEvent("EnableDone",this);
2005  }
2006  catch (xcept::Exception &e) {
2007  std::cout << "exception " << (std::string)e.what() << std::endl;
2008  throw;
2009  }
2010 
2011  return false;
2012 }
2013 
2015  ((FUEventProcessor*)addr)->forkProcessesFromEDM();
2016 }
2017 
2019 
2020  moduleweb::ForkParams * forkParams = &(forkInfoObj_->forkParams);
2021  unsigned int forkFrom=0;
2022  unsigned int forkTo=nbSubProcesses_.value_;
2023  if (forkParams->slotId>=0) {
2024  forkFrom=forkParams->slotId;
2025  forkTo=forkParams->slotId+1;
2026  }
2027 
2028  //before fork, make sure to disconnect output modules from Shm
2029  try {
2030  if (sorRef_) {
2031  std::vector<edm::FUShmOutputModule *> shmOutputs = sorRef_->getShmOutputModules();
2032  for (unsigned int i=0;i<shmOutputs.size();i++) {
2033  //unregister PID from ShmBuffer/RB
2034  shmOutputs[i]->unregisterFromShm();
2035  //disconnect from Shm
2036  shmOutputs[i]->stop();
2037  }
2038  }
2039  }
2040  catch (std::exception &e)
2041  {
2042  reasonForFailedState_ = (std::string)"Thrown exception while disconnecting ShmOutputModule from Shm: " + e.what();
2043  LOG4CPLUS_ERROR(getApplicationLogger(),reasonForFailedState_);
2046  }
2047  catch (...) {
2048  reasonForFailedState_ = "Thrown unknown exception while disconnecting ShmOutputModule from Shm: ";
2049  LOG4CPLUS_ERROR(getApplicationLogger(),reasonForFailedState_);
2052  }
2053 
2054  std::string currentState = fsm_.stateName()->toString();
2055 
2056  //destroy MessageServicePresence thread before fork
2057  if (currentState!="stopping") {
2058  try {
2059  messageServicePresence_.reset();
2060  }
2061  catch (...) {
2062  LOG4CPLUS_ERROR(getApplicationLogger(),"Unable to destroy MessageService thread before fork!");
2063  }
2064  }
2065 
2066  if (currentState=="stopping") {
2067  LOG4CPLUS_ERROR(getApplicationLogger(),"Can not fork subprocesses in state " << fsm_.stateName()->toString());
2068  forkParams->isMaster=1;
2071  }
2072  //fork loop
2073  else for(unsigned int i=forkFrom; i<forkTo; i++)
2074  {
2075  int retval = subs_[i].forkNew();
2076  if(retval==0)
2077  {
2078  forkParams->isMaster=0;
2079  myProcess_ = &subs_[i];
2080  // dirty hack: delete/recreate global binary semaphore for later use in child
2082  toolbox::mem::_s_mutex_ptr_ = new toolbox::BSem(toolbox::BSem::FULL,true);
2083  int retval = pthread_mutex_destroy(&stop_lock_);
2084  if(retval != 0) perror("error");
2085  retval = pthread_mutex_init(&stop_lock_,0);
2086  if(retval != 0) perror("error");
2088 
2089  //recreate MessageLogger thread in slave after fork
2090  try{
2092  if(pf != 0) {
2093  messageServicePresence_ = pf->makePresence("MessageServicePresence");
2094  }
2095  else {
2096  LOG4CPLUS_ERROR(getApplicationLogger(),
2097  "SLAVE: Unable to create message service presence. pid:"<<getpid());
2098  }
2099  }
2100  catch(...) {
2101  LOG4CPLUS_ERROR(getApplicationLogger(),"SLAVE: Unknown Exception in MessageServicePresence. pid:"<<getpid());
2102  }
2103 
2105 
2106  //reconnect to Shm from output modules
2107  try {
2108  if (sorRef_) {
2109  std::vector<edm::FUShmOutputModule *> shmOutputs = sorRef_->getShmOutputModules();
2110  for (unsigned int i=0;i<shmOutputs.size();i++)
2111  shmOutputs[i]->start();
2112  }
2113  }
2114  catch (...)
2115  {
2116  LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception (ShmOutputModule sending InitMsg (pid:"<<getpid() <<")");
2117  }
2118 
2119  if (forkParams->restart) {
2120  //do restart things
2121  scalersUpdates_ = 0;
2122  try {
2123  fsm_.fireEvent("Stop",this); // need to set state in fsm first to allow stopDone transition
2124  fsm_.fireEvent("StopDone",this); // need to set state in fsm first to allow stopDone transition
2125  fsm_.fireEvent("Enable",this); // need to set state in fsm first to allow stopDone transition
2126  } catch (...) {
2127  LOG4CPLUS_WARN(getApplicationLogger(),"Failed to Stop/Enable FSM of the restarted slave EP");
2128  }
2129  try{
2130  xdata::Serializable *lsid = applicationInfoSpace_->find("lumiSectionIndex");
2131  if(lsid) {
2132  ((xdata::UnsignedInteger32*)(lsid))->value_--; // need to reset to value before end of ls in which process died
2133  }
2134  }
2135  catch(...){
2136  std::cout << "trouble with lsindex during restart" << std::endl;
2137  }
2138  try{
2139  xdata::Serializable *lstb = applicationInfoSpace_->find("lsToBeRecovered");
2140  if(lstb) {
2141  ((xdata::Boolean*)(lstb))->value_ = false; // do not issue eol/bol for all Ls when restarting
2142  }
2143  }
2144  catch(...){
2145  std::cout << "trouble with resetting flag for eol recovery " << std::endl;
2146  }
2149  }
2150 
2151  //start other threads
2155  while(!evtProcessor_.isWaitingForLs())
2156  ::usleep(100000);//wait for scalers loop to start
2157 
2158  //connect DQMShmOutputModule
2159  if(hasShMem_) attachDqmToShm();
2160 
2161  //catch transition error if we are already Enabled
2162  try {
2163  fsm_.fireEvent("EnableDone",this);
2164  }
2165  catch (...) {}
2166 
2167  //make sure workloops are started
2168  while (!wlReceiving_->isActive() || !wlReceivingMonitor_->isActive()) usleep(10000);
2169 
2170  //unmask signals
2171  sigset_t tmpset_thread;
2172  sigemptyset(&tmpset_thread);
2173  sigaddset(&tmpset_thread, SIGQUIT);
2174  sigaddset(&tmpset_thread, SIGILL);
2175  sigaddset(&tmpset_thread, SIGABRT);
2176  sigaddset(&tmpset_thread, SIGFPE);
2177  sigaddset(&tmpset_thread, SIGSEGV);
2178  sigaddset(&tmpset_thread, SIGALRM);
2179  //sigprocmask(SIG_UNBLOCK, &tmpset_thread, 0);
2180  pthread_sigmask(SIG_UNBLOCK,&tmpset_thread,0);
2181 
2182  //set signal handlers
2183  struct sigaction sa;
2184  sigset_t tmpset;
2185  memset(&tmpset,0,sizeof(tmpset));
2186  sigemptyset(&tmpset);
2187  sa.sa_mask=tmpset;
2188  sa.sa_flags=SA_RESETHAND | SA_SIGINFO;
2189  sa.sa_handler=0;
2190  sa.sa_sigaction=evfep_sighandler;
2191 
2192  sigaction(SIGQUIT,&sa,0);
2193  sigaction(SIGILL,&sa,0);
2194  sigaction(SIGABRT,&sa,0);
2195  sigaction(SIGFPE,&sa,0);
2196  sigaction(SIGSEGV,&sa,0);
2197  sa.sa_sigaction=evfep_alarmhandler;
2198  sigaction(SIGALRM,&sa,0);
2199 
2200  //child return to DaqSource
2201  return ;
2202  }
2203  else {
2204 
2205  forkParams->isMaster=1;
2207  if (forkParams->restart) {
2208  std::ostringstream ost1;
2209  ost1 << "-I- New Process " << retval << " forked for slot " << forkParams->slotId;
2210  localLog(ost1.str());
2211  }
2213  //start "crash" receiver workloop
2214  }
2215  }
2216 
2217  //recreate MessageLogger thread after fork
2218  try{
2219  //release the presense factory in master
2221  if(pf != 0) {
2222  messageServicePresence_ = pf->makePresence("MessageServicePresence");
2223  }
2224  else {
2225  LOG4CPLUS_ERROR(getApplicationLogger(),
2226  "Unable to recreate message service presence ");
2227  }
2228  }
2229  catch(...) {
2230  LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception in MessageServicePresence");
2231  }
2232  restart_in_progress_=false;
2233  edm_init_done_=true;
2234 }
2235 
2237 {
2239  try {
2240  //set to connect to Shm later
2241  //if(hasShMem_) setAttachDqmToShm();
2242 
2243  int sc = 0;
2244  //maybe not needed in MP mode
2246  if(isRunNumberSetter_)
2248  else
2250 
2251  //prepare object used to communicate with DaqSource
2252  pthread_mutex_destroy(&forkObjLock_);
2253  pthread_mutex_init(&forkObjLock_,0);
2254  if (forkInfoObj_) delete forkInfoObj_;
2257  forkInfoObj_->fuAddr=(void*)this;
2263  if (mwrRef_)
2264  mwrRef_->publishForkInfo(std::string("DaqSource"),forkInfoObj_);
2265 
2267  sc = evtProcessor_->statusAsync();
2268 
2269  if(sc != 0) {
2270  std::ostringstream oss;
2271  oss<<"EventProcessor::runAsync returned status code " << sc;
2272  reasonForFailedState_ = oss.str();
2274  LOG4CPLUS_FATAL(log_,reasonForFailedState_);
2275  return false;
2276  }
2277  }
2278  //catch exceptions on master side
2279  catch(cms::Exception &e) {
2282  LOG4CPLUS_FATAL(log_,reasonForFailedState_);
2283  return false;
2284  }
2285  catch(std::exception &e) {
2286  reasonForFailedState_ = e.what();
2288  LOG4CPLUS_FATAL(log_,reasonForFailedState_);
2289  return false;
2290  }
2291  catch(...) {
2292  reasonForFailedState_ = "Unknown Exception";
2294  LOG4CPLUS_FATAL(log_,reasonForFailedState_);
2295  return false;
2296  }
2297  return true;
2298 }
2299 
2300 bool FUEventProcessor::restartForkInEDM(unsigned int slotId) {
2301  //daqsource will keep this lock until master returns after fork
2302  //so that we don't do another EP restart in between
2303  forkInfoObj_->lock();
2304  forkInfoObj_->forkParams.slotId=slotId;
2308  LOG4CPLUS_DEBUG(log_, " restarting subprocess in slot "<< slotId <<": posting on control semaphore");
2309  sem_post(forkInfoObj_->control_sem_);
2310  forkInfoObj_->unlock();
2311  usleep(1000);
2312  //sleep until fork is performed
2313  int count=50;
2314  restart_in_progress_=true;
2315  while (restart_in_progress_ && count--) usleep(20000);
2316  return true;
2317 }
2318 
2320 {
2321  bool retval = enableCommon();
2323  LOG4CPLUS_INFO(getApplicationLogger(),"waiting for edm::EventProcessor to start before enabling watchdog");
2324  ::sleep(1);
2325  }
2326 
2327  // implementation moved to EPWrapper
2328  // startScalersWorkLoop(); // this is now not done any longer
2329  localLog("-I- Start completed");
2330  return retval;
2331 }
2333 {
2334  //all this happens only in the child process
2335 
2340  while(!evtProcessor_.isWaitingForLs())
2341  ::sleep(1);
2342 
2343  // @EM test do not run monitor loop in slave, only receiving&Monitor
2344  // evtProcessor_.startMonitoringWorkLoop();
2345  try{
2346  // evtProcessor_.makeServicesOnly();
2347  try{
2349  if(pf != 0) {
2350  pf->makePresence("MessageServicePresence").release();
2351  }
2352  else {
2353  LOG4CPLUS_ERROR(getApplicationLogger(),
2354  "Unable to create message service presence ");
2355  }
2356  }
2357  catch(...) {
2358  LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception");
2359  }
2361  }
2362  catch (xcept::Exception &e) {
2363  reasonForFailedState_ = "enabling FAILED: " + (std::string)e.what();
2366  }
2367  bool retval = enableCommon();
2368  // while(evtProcessor_->getState()!= edm::event_processor::sRunning){
2369  // LOG4CPLUS_INFO(getApplicationLogger(),"waiting for edm::EventProcessor to start before enabling watchdog");
2370  // ::sleep(1);
2371  // }
2372  return retval;
2373 }
2374 
2376 {
2377  bool failed=false;
2378  try {
2379  LOG4CPLUS_INFO(getApplicationLogger(),"Start stopping :) ...");
2382  fsm_.fireEvent("StopDone",this);
2383  else
2384  {
2385  failed=true;
2386  // epMState_ = evtProcessor_->currentStateName();
2388  reasonForFailedState_ = "EventProcessor stop timed out";
2389  else
2390  reasonForFailedState_ = "EventProcessor did not receive STOP event";
2391  }
2392  }
2393  catch (xcept::Exception &e) {
2394  failed=true;
2395  reasonForFailedState_ = "Stopping FAILED: " + (std::string)e.what();
2396  }
2397  catch (edm::Exception &e) {
2398  failed=true;
2399  reasonForFailedState_ = "Stopping FAILED: " + (std::string)e.what();
2400  }
2401  catch (...) {
2402  failed=true;
2403  reasonForFailedState_= "Stopping FAILED: unknown exception";
2404  }
2405  try {
2406  if (hasShMem_) {
2407  detachDqmFromShm();
2408  if (failed)
2409  LOG4CPLUS_WARN(getApplicationLogger(),
2410  "In failed STOP - success detaching DQM from Shm. pid:" << getpid());
2411  }
2412  }
2413  catch (cms::Exception & e) {
2414  failed=true;
2415  reasonForFailedState_= "Stopping FAILED: " + (std::string)e.what();
2416  }
2417  catch (...) {
2418  failed=true;
2419  reasonForFailedState_= "DQM detach failed: Unknown exception";
2420  }
2421 
2422  if (failed) {
2423  LOG4CPLUS_FATAL(getApplicationLogger(),"STOP failed: "
2424  << reasonForFailedState_ << " (pid:" << getpid()<<")");
2427  }
2428 
2429  LOG4CPLUS_INFO(getApplicationLogger(),"Finished stopping!");
2430  localLog("-I- Stop completed");
2431  return false;
2432 }
2433 
2435 {
2438 
2439  std::vector<bool> processes_to_stop(nbSubProcesses_.value_,false);
2440  for(unsigned int i = 0; i < subs_.size(); i++)
2441  {
2442  pthread_mutex_lock(&stop_lock_);
2443  if(subs_[i].alive()>0){
2444  processes_to_stop[i] = true;
2445  subs_[i].post(msg,false);
2446  }
2447  pthread_mutex_unlock(&stop_lock_);
2448  }
2449  for(unsigned int i = 0; i < subs_.size(); i++)
2450  {
2451  pthread_mutex_lock(&stop_lock_);
2452  if(processes_to_stop[i]){
2453  try{
2454  subs_[i].rcv(msg1,false);
2455  }
2456  catch(evf::Exception &e){
2457  std::ostringstream ost;
2458  ost << "failed to get STOP - errno ->" << errno << " " << e.what();
2459  reasonForFailedState_ = ost.str();
2460  LOG4CPLUS_ERROR(getApplicationLogger(),reasonForFailedState_);
2461  // fsm_.fireFailed(reasonForFailedState_,this);
2463  pthread_mutex_unlock(&stop_lock_);
2464  continue;
2465  }
2466  }
2467  else {
2468  pthread_mutex_unlock(&stop_lock_);
2469  continue;
2470  }
2471  pthread_mutex_unlock(&stop_lock_);
2472  if(msg1->mtype==MSQS_MESSAGE_TYPE_STOP)
2473  while(subs_[i].alive()>0) ::usleep(10000);
2474  subs_[i].disconnect();
2475  }
2476  // subs_.clear();
2477 
2478 }
2479 
2481 {
2482  std::string urn = getApplicationDescriptor()->getURN();
2483  try{
2486  if(myProcess_) {std::cout << "microstate called for child! bail out" << std::endl; return;}
2487  *out << "<tr><td>" << fsm_.stateName()->toString()
2488  << "</td><td>"<< (myProcess_ ? "S" : "M") <<"</td><td>" << nblive_ << "</td><td>"
2489  << nbdead_ << "</td><td><a href=\"/" << urn << "/procStat\">" << getpid() <<"</a></td>";
2490  evtProcessor_.microState(in,out);
2491  *out << "<td></td><td>" << nbTotalDQM_
2492  << "</td><td>" << evtProcessor_.getScalersUpdates() << "</td></tr>";
2493  if(nbSubProcesses_.value_!=0 && !myProcess_)
2494  {
2495  pthread_mutex_lock(&start_lock_);
2496  for(unsigned int i = 0; i < subs_.size(); i++)
2497  {
2498  try{
2499  if(subs_[i].alive()>0)
2500  {
2501  *out << "<tr><td bgcolor=\"#00FF00\" id=\"a"
2502  << i << "\">""Alive</td><td>S</td><td>"
2503  << subs_[i].queueId() << "<td>"
2504  << subs_[i].queueStatus()<< "/"
2505  << subs_[i].queueOccupancy() << "/"
2506  << subs_[i].queuePidOfLastSend() << "/"
2507  << subs_[i].queuePidOfLastReceive()
2508  << "</td><td><a id=\"p"<< i << "\" href=\"SubWeb?process="
2509  << subs_[i].pid() << "&method=procStat\">"
2510  << subs_[i].pid()<<"</a></td>" //<< msg->mtext;
2511  << "<td>" << evtProcessor_.stateNameFromIndex(subs_[i].params().Ms) << "</td><td>"
2512  << evtProcessor_.moduleNameFromIndex(subs_[i].params().ms) << "</td><td>"
2513  << subs_[i].params().nba << "/" << subs_[i].params().nbp
2514  << " (" << float(subs_[i].params().nba)/float(subs_[i].params().nbp)*100. <<"%)"
2515  << "</td><td>" << subs_[i].params().ls << "/" << subs_[i].params().ls
2516  << "</td><td>" << subs_[i].params().ps
2517  << "</td><td"
2518  << ((subs_[i].params().eols<subs_[i].params().ls) ? " bgcolor=\"#00FF00\"" : " bgcolor=\"#FF0000\"") << ">"
2519  << subs_[i].params().eols
2520  << "</td><td>" << subs_[i].params().dqm
2521  << "</td><td>" << subs_[i].params().trp << "</td>";
2522  }
2523  else
2524  {
2525  pthread_mutex_lock(&pickup_lock_);
2526  *out << "<tr><td id=\"a"<< i << "\" ";
2527  if(subs_[i].alive()==-1000)
2528  *out << " bgcolor=\"#bbaabb\">NotInitialized";
2529  else
2530  *out << (subs_[i].alive()==0 ? ">Done" : " bgcolor=\"#FF0000\">Dead");
2531  *out << "</td><td>S</td><td>"<< subs_[i].queueId() << "<td>"
2532  << subs_[i].queueStatus() << "/"
2533  << subs_[i].queueOccupancy() << "/"
2534  << subs_[i].queuePidOfLastSend() << "/"
2535  << subs_[i].queuePidOfLastReceive()
2536  << "</td><td id=\"p"<< i << "\">"
2537  <<subs_[i].pid()<<"</td><td colspan=\"5\">" << subs_[i].reasonForFailed();
2538  if(subs_[i].alive()!=0 && subs_[i].alive()!=-1000)
2539  {
2540  if(autoRestartSlaves_ && subs_[i].restartCount()<=2)
2541  *out << " will restart in " << subs_[i].countdown() << " s";
2542  else if(autoRestartSlaves_)
2543  *out << " reached maximum restart count";
2544  else *out << " autoRestart is disabled ";
2545  }
2546  *out << "</td><td"
2547  << ((subs_[i].params().eols<subs_[i].params().ls) ?
2548  " bgcolor=\"#00FF00\"" : " bgcolor=\"#FF0000\"")
2549  << ">"
2550  << subs_[i].params().eols
2551  << "</td><td>" << subs_[i].params().dqm
2552  << "</td><td>" << subs_[i].params().trp << "</td>";
2553  pthread_mutex_unlock(&pickup_lock_);
2554  }
2555  *out << "</tr>";
2556  }
2557  catch(evf::Exception &e){
2558  *out << "<tr><td id=\"a"<< i << "\" "
2559  <<"bgcolor=\"#FFFF00\">NotResp</td><td>S</td><td>"<< subs_[i].queueId() << "<td>"
2560  << subs_[i].queueStatus() << "/"
2561  << subs_[i].queueOccupancy() << "/"
2562  << subs_[i].queuePidOfLastSend() << "/"
2563  << subs_[i].queuePidOfLastReceive()
2564  << "</td><td id=\"p"<< i << "\">"
2565  <<subs_[i].pid()<<"</td>";
2566  }
2567  }
2568  pthread_mutex_unlock(&start_lock_);
2569  }
2570  }
2571  catch(evf::Exception &e)
2572  {
2573  LOG4CPLUS_INFO(getApplicationLogger(),"evf::Exception caught in microstate - " << e.what());
2574  }
2575  catch(cms::Exception &e)
2576  {
2577  LOG4CPLUS_INFO(getApplicationLogger(),"cms::Exception caught in microstate - " << e.what());
2578  }
2579  catch(std::exception &e)
2580  {
2581  LOG4CPLUS_INFO(getApplicationLogger(),"std::Exception caught in microstate - " << e.what());
2582  }
2583  catch(...)
2584  {
2585  LOG4CPLUS_INFO(getApplicationLogger(),"unknown exception caught in microstate - ");
2586  }
2587 
2588 }
2589 
2590 
2592 {
2593  using namespace utils;
2594 
2595  *out << updaterStatic_;
2596  mDiv(out,"loads");
2597  uptime(out);
2598  cDiv(out);
2599  mDiv(out,"st",fsm_.stateName()->toString());
2600  mDiv(out,"ru",runNumber_.toString());
2601  mDiv(out,"nsl",nbSubProcesses_.value_);
2602  mDiv(out,"nsr",nbSubProcessesReporting_.value_);
2603  mDiv(out,"cl");
2604  *out << getApplicationDescriptor()->getClassName()
2605  << (nbSubProcesses_.value_ > 0 ? "MP " : " ");
2606  cDiv(out);
2607  mDiv(out,"in",getApplicationDescriptor()->getInstance());
2608  if(fsm_.stateName()->toString() != "Halted" && fsm_.stateName()->toString() != "halting"){
2609  mDiv(out,"hlt");
2610  *out << "<a href=\"" << configString_.toString() << "\">HLT Config</a>";
2611  cDiv(out);
2612  *out << std::endl;
2613  }
2614  else
2615  mDiv(out,"hlt","Not yet...");
2616 
2617  mDiv(out,"sq",squidPresent_.toString());
2618  mDiv(out,"vwl",(supervising_ ? "Active" : "Not Initialized"));
2619  mDiv(out,"mwl",evtProcessor_.wlMonitoring());
2620  if(nbProcessed != 0 && nbAccepted != 0)
2621  {
2622  mDiv(out,"tt",((xdata::UnsignedInteger32*)nbProcessed)->value_);
2623  mDiv(out,"ac",((xdata::UnsignedInteger32*)nbAccepted)->value_);
2624  }
2625  else
2626  {
2627  mDiv(out,"tt",0);
2628  mDiv(out,"ac",0);
2629  }
2630  if(!myProcess_)
2631  mDiv(out,"swl",(wlSummarizeActive_ ? "Active" : "Inactive"));
2632  else
2633  mDiv(out,"swl",(wlScalersActive_ ? "Active" : "Inactive"));
2634 
2635  mDiv(out,"idi",iDieUrl_.value_);
2636  if(vp_!=0){
2637  mDiv(out,"vpi",(unsigned int) vp_);
2638  if(vulture_->hasStarted()>=0)
2639  mDiv(out,"vul","Prowling");
2640  else
2641  mDiv(out,"vul","Dead");
2642  }
2643  else{
2644  mDiv(out,"vul",(vulture_==0 ? "Nope" : "Hatching"));
2645  }
2646  if(evtProcessor_){
2647  mDiv(out,"ll");
2648  *out << evtProcessor_.lastLumi().ls
2649  << "," << evtProcessor_.lastLumi().proc << "," << evtProcessor_.lastLumi().acc;
2650  cDiv(out);
2651  }
2652  mDiv(out,"lg");
2653  for(unsigned int i = logRingIndex_; i<logRingSize_; i++)
2654  *out << logRing_[i] << std::endl;
2655  if(logWrap_)
2656  for(unsigned int i = 0; i<logRingIndex_; i++)
2657  *out << logRing_[i] << std::endl;
2658  cDiv(out);
2659  mDiv(out,"cha");
2660  if(cpustat_) *out << cpustat_->getChart();
2661  cDiv(out);
2662 }
2663 
2665 {
2666  evf::utils::procStat(out);
2667 }
2668 
2670 {
2671  if(myProcess_) myProcess_->postSlave(buf,true);
2672 }
2673 
2675 {
2676  using namespace utils;
2677  std::ostringstream ost;
2678  mDiv(&ost,"ve");
2679  ost<< "$Revision: 1.162 $ (" << edm::getReleaseVersion() <<")";
2680  cDiv(&ost);
2681  mDiv(&ost,"ou",outPut_.toString());
2682  mDiv(&ost,"sh",hasShMem_.toString());
2683  mDiv(&ost,"mw",hasModuleWebRegistry_.toString());
2684  mDiv(&ost,"sw",hasServiceWebRegistry_.toString());
2685 
2686  xdata::Serializable *monsleep = 0;
2687  xdata::Serializable *lstimeout = 0;
2688  try{
2689  monsleep = applicationInfoSpace_->find("monSleepSec");
2690  lstimeout = applicationInfoSpace_->find("lsTimeOut");
2691  }
2693  }
2694 
2695  if(monsleep!=0)
2696  mDiv(&ost,"ms",monsleep->toString());
2697  if(lstimeout!=0)
2698  mDiv(&ost,"lst",lstimeout->toString());
2699  char cbuf[sizeof(struct utsname)];
2700  struct utsname* buf = (struct utsname*)cbuf;
2701  uname(buf);
2702  mDiv(&ost,"sysinfo");
2703  ost << buf->sysname << " " << buf->nodename
2704  << " " << buf->release << " " << buf->version << " " << buf->machine;
2705  cDiv(&ost);
2706  updaterStatic_ = ost.str();
2707 }
2708 
2709 void FUEventProcessor::handleSignalSlave(int sig, siginfo_t* info, void* c)
2710 {
2711  //notify master
2712  sem_post(sigmon_sem_);
2713 
2714  //sleep while master takes action
2715  sleep(2);
2716 
2717  //set up alarm if handler deadlocks on unsafe actions
2718  alarm(5);
2719 
2720  std::cout << "--- Slave EP signal handler caught signal " << sig << " process id is " << info->si_pid <<" ---" << std::endl;
2721  std::cout << "--- Address: " << std::hex << info->si_addr << std::dec << " --- " << std::endl;
2722  std::cout << "--- Stacktrace follows --" << std::endl;
2723  std::ostringstream stacktr;
2724  toolbox::stacktrace(20,stacktr);
2725  std::cout << stacktr.str();
2727  std::cout << "--- Dumping core." << " --- " << std::endl;
2728  else
2729  std::cout << "--- Core dump count exceeded on this FU. ---"<<std::endl;
2730 
2731  std::string hasdump = "";
2732  if (rlimit_coresize_changed_) hasdump = " (core dump disabled) ";
2733 
2734  LOG4CPLUS_ERROR(getApplicationLogger(), "--- Slave EP signal handler caught signal " << sig << ". process id is " << getpid()
2735  << " on node " << toolbox::net::getHostName() << " ---" << std::endl
2736  << "--- Address: " << std::hex << info->si_addr << std::dec << " --- " << std::endl
2737  << "--- Stacktrace follows"<< hasdump << " ---" << std::endl << stacktr.str()
2738  );
2739 
2740  //re-raise signal with default handler (will cause core dump if enabled)
2741  raise(sig);
2742 }
2743 
2744 
2745 XDAQ_INSTANTIATOR_IMPL(evf::FUEventProcessor)
xdata::Boolean hasModuleWebRegistry_
static const char runNumber_[]
virtual char const * what() const
Definition: Exception.cc:141
dbl * delta
Definition: mlp_gen.cc:36
void summaryWebPage(xgi::Input *, xgi::Output *, const std::string &)
Definition: FWEPWrapper.cc:815
unsigned int ps
Definition: queue_defs.h:60
int stop()
Definition: Vulture.cc:254
unsigned long rcvSlave(MsgBuf &ptr, bool isMonitor)
Definition: SubProcess.h:116
ShmOutputModuleRegistry * sorRef_
void spotlightWebPage(xgi::Input *, xgi::Output *)
bool check()
Definition: SquidNet.cc:21
xdata::Vector< xdata::Integer > spMStates_
int i
Definition: DBlmapReader.cc:9
xoap::MessageReference commandCallback(xoap::MessageReference msg)
Definition: StateMachine.cc:71
xdata::InfoSpace * monitorInfoSpace_
unsigned int eols
Definition: queue_defs.h:59
void subWeb(xgi::Input *in, xgi::Output *out)
pid_t makeProcess()
Definition: Vulture.cc:154
#define Input(cl)
Definition: vmac.h:189
event_processor::State getState() const
unsigned int nbp
Definition: queue_defs.h:61
std::vector< std::string > logRing_
bool receivingAndMonitor(toolbox::task::WorkLoop *wl)
xdata::UnsignedInteger32 slaveRestartDelaySecs_
xdata::Boolean datasetCounting_
void updateRollingReport()
xdata::UnsignedInteger32 runNumber_
#define MSGQ_MESSAGE_TYPE_RANGE
Definition: queue_defs.h:13
void updater(xgi::Input *in, xgi::Output *out)
toolbox::task::WorkLoop * wlScalers_
pthread_mutex_t start_lock_
xdata::Integer epmAltState_
Definition: FWEPWrapper.h:197
std::map< std::string, std::string, std::less< std::string > > & getEnvironment()
#define MSQS_MESSAGE_TYPE_TRR
Definition: queue_defs.h:34
void startMonitoringWorkLoop()
Definition: FWEPWrapper.cc:614
virtual std::string explainSelf() const
Definition: Exception.cc:146
void publishForkInfo(std::string name, moduleweb::ForkInfoObj *forkInfoObj)
xdata::UnsignedInteger32 crashesToDump_
void taskWebPage(xgi::Input *, xgi::Output *, const std::string &)
Definition: FWEPWrapper.cc:881
tuple pieces
Definition: csv2json.py:31
bool restartForkInEDM(unsigned int slotId)
xdata::Boolean isRunNumberSetter_
void setAppCtxt(xdaq::ApplicationContext *ctx)
Definition: FWEPWrapper.h:91
ShmOutputModuleRegistry * getShmOutputModuleRegistry()
Definition: FWEPWrapper.cc:491
unsigned int acc
Definition: FWEPWrapper.h:45
xdata::UnsignedInteger32 forkInEDM_
void sendAuxLegenda(const std::string &)
Definition: RateStat.cc:29
xdata::Boolean hasServiceWebRegistry_
void clearCounters()
Clears counters used by trigger report.
toolbox::task::WorkLoop * wlReceiving_
void setCPUStat(int busyPer1k)
Definition: CPUStat.h:25
#define nullptr
boost::tokenizer< boost::char_separator< char > > tokenizer
void init(unsigned short, std::string &)
Definition: FWEPWrapper.cc:193
xdata::Boolean hasShMem_
MsgBuf & getPackedTriggerReport()
Definition: FWEPWrapper.h:114
#define MSQM_MESSAGE_TYPE_FSTOP
Definition: queue_defs.h:17
def pipe
Definition: pipe.py:5
toolbox::task::ActionSignature * asReceiveMsgAndExecute_
#define NULL
Definition: scimark2.h:8
toolbox::task::ActionSignature * asSignalMonitor_
FUEventProcessor * FUInstancePtr_
void(* forkHandler)(void *)
Definition: ModuleWeb.h:44
#define min(a, b)
Definition: mlp_lapack.h:161
unsigned int ls
Definition: queue_defs.h:58
void procStat(std::ostringstream *out)
Definition: procUtils.cc:176
void setupFastTimerService(unsigned int nProcesses)
Definition: FWEPWrapper.cc:497
xdata::InfoSpace * scalersLegendaInfoSpace_
toolbox::task::ActionSignature * asSupervisor_
void setRunNumber(RunNumber_t runNumber)
xdata::Boolean autoRestartSlaves_
tuple els
Definition: asciidump.py:420
std::string const & moduleNameFromIndex(unsigned int i) const
Definition: FWEPWrapper.h:122
xdata::UnsignedInteger32 nbSubProcessesReporting_
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:7
void forceInitEventProcessorMaybe()
Definition: FWEPWrapper.h:68
unsigned int crashesThisRun_
void css(xgi::Input *in, xgi::Output *out)
#define MAX_MSG_SIZE
Definition: queue_defs.h:10
void cDiv(std::ostringstream *out)
Definition: procUtils.cc:357
lsTriplet & lastLumi()
Definition: FWEPWrapper.h:133
void sleep(Duration_t)
Definition: Utils.h:163
#define MSQM_MESSAGE_TYPE_RLR
Definition: queue_defs.h:25
toolbox::task::ActionSignature * asReceiveMsgAndRead_
void setElapsed(int mseconds)
Definition: CPUStat.h:28
xdata::Boolean exitOnError_
void evfep_sighandler(int sig, siginfo_t *info, void *c)
std::vector< SubProcess > subs_
xdata::InfoSpace * monitorLegendaInfoSpace_
void initialize(T *app)
Definition: StateMachine.h:84
bool summarize(toolbox::task::WorkLoop *wl)
void getSlavePids(xgi::Input *in, xgi::Output *out)
xdata::Serializable * nbProcessed
xoap::MessageReference fsmCallback(xoap::MessageReference msg)
pthread_mutex_t forkObjLock_
U second(std::pair< T, U > const &p)
std::string wlMonitoring()
Definition: FWEPWrapper.h:98
#define MSQM_MESSAGE_TYPE_PRG
Definition: queue_defs.h:19
xdata::Bag< xdaq2rc::ClassnameAndInstance > * rcmsStateListener()
Definition: StateMachine.h:72
void fireFailed(const std::string &errorMsg, void *originator)
#define MSQS_MESSAGE_TYPE_WEB
Definition: queue_defs.h:33
void adjustLsIndexForRestart()
Definition: FWEPWrapper.h:112
bool halting(toolbox::task::WorkLoop *wl)
#define MSQS_MESSAGE_TYPE_STOP
Definition: queue_defs.h:31
#define MSQS_MESSAGE_TYPE_PRR
Definition: queue_defs.h:32
bool fireScalersUpdate()
Definition: FWEPWrapper.cc:777
evf::StateMachine fsm_
bool scalers(toolbox::task::WorkLoop *wl)
unsigned int stopCondition
Definition: ModuleWeb.h:46
#define MSQM_MESSAGE_TYPE_RLI
Definition: queue_defs.h:24
int cycle
ModuleWebRegistry * getModuleWebRegistry()
Definition: FWEPWrapper.cc:485
unsigned int lsid_
Definition: FWEPWrapper.h:210
toolbox::task::WorkLoop * wlSupervising_
void declareRunNumber(RunNumber_t runNumber)
static void setAppl(xdaq::Application *app)
Definition: MLlog4cplus.cc:85
std::string reasonForFailedState_
void resetLumiSectionReferenceIndex()
Definition: FWEPWrapper.h:137
unsigned int proc
Definition: FWEPWrapper.h:44
bool monitoring(toolbox::task::WorkLoop *wl)
Definition: FWEPWrapper.cc:647
FUEventProcessor(xdaq::ApplicationStub *s)
int postSlave(MsgBuf &ptr, bool isMonitor)
Definition: SubProcess.h:110
#define PIPE_READ
Definition: queue_defs.h:40
unsigned int psid_
Definition: FWEPWrapper.h:211
unsigned long long idleProcStats_
bool isAvailable() const
Definition: Service.h:47
xdata::Boolean iDieStatisticsGathering_
int evfep_raised_signal
int j
Definition: DBlmapReader.cc:9
unsigned int Ms
Definition: queue_defs.h:63
int start(std::string, int=0)
Definition: Vulture.cc:241
void setAppDesc(xdaq::ApplicationDescriptor *ad)
Definition: FWEPWrapper.h:90
bool sigmon(toolbox::task::WorkLoop *wl)
void setNproc(int nproc)
Definition: CPUStat.h:22
uint16_t mem[nChs][nEvts]
void scalersWeb(xgi::Input *, xgi::Output *)
void resetPackedTriggerReport()
Definition: FWEPWrapper.h:111
xdata::Boolean squidPresent_
list mod
Load physics model.
std::list< std::string > names_
void setScalersInfoSpace(xdata::InfoSpace *sis, xdata::InfoSpace *slis)
Definition: FWEPWrapper.h:77
void addEntry(int sta)
Definition: CPUStat.h:17
std::vector< edm::FUShmOutputModule * > getShmOutputModules()
void sendStat(unsigned int)
Definition: CPUStat.cc:33
xdata::Boolean hasPrescaleService_
#define NUMERIC_MESSAGE_SIZE
Definition: queue_defs.h:38
bool first
Definition: L1TdeRCT.cc:94
int totalEvents() const
unsigned int getNumberOfMicrostates()
Definition: FWEPWrapper.h:140
void handleSignalSlave(int sig, siginfo_t *info, void *c)
bool receiving(toolbox::task::WorkLoop *wl)
bool stopping(toolbox::task::WorkLoop *wl)
void reset()
Definition: CPUStat.h:31
void procStat(xgi::Input *in, xgi::Output *out)
moduleweb::ForkInfoObj * forkInfoObj_
toolbox::task::WorkLoop * wlSummarize_
unsigned int nba
Definition: queue_defs.h:62
char const * stateName(event_processor::State s) const
void fireEvent(const std::string &evtType, void *originator)
xdata::Serializable * nbAccepted
tuple out
Definition: dbtoconf.py:99
void disableRcmsStateNotification()
Definition: StateMachine.h:64
toolbox::BSem * _s_mutex_ptr_
std::string getReleaseVersion()
#define MSQM_MESSAGE_TYPE_TRP
Definition: queue_defs.h:21
void localLog(std::string)
static PresenceFactory * get()
#define MSQS_MESSAGE_TYPE_MCR
Definition: queue_defs.h:30
void setMonitorInfoSpace(xdata::InfoSpace *mis, xdata::InfoSpace *mlis)
Definition: FWEPWrapper.h:83
static void forkProcessFromEDM_helper(void *addr)
ModuleWebRegistry * mwrRef_
bool configuring(toolbox::task::WorkLoop *wl)
int notstarted_state_code() const
Definition: FWEPWrapper.h:132
xdata::InfoSpace * applicationInfoSpace_
edm::EventProcessor::StatusCode stop()
Definition: FWEPWrapper.cc:503
xdata::UnsignedInteger32 nbSubProcesses_
bool enabling(toolbox::task::WorkLoop *wl)
std::auto_ptr< Presence > makePresence(std::string const &presence_type) const
xdata::String * stateName()
Definition: StateMachine.h:69
StatusCode statusAsync() const
void setApplicationInfoSpace(xdata::InfoSpace *is)
Definition: FWEPWrapper.h:82
unsigned int getLumiSectionReferenceIndex()
Definition: FWEPWrapper.h:139
std::string & getChart()
Definition: CPUStat.h:43
bool getTriggerReport(bool useLock)
Definition: FWEPWrapper.cc:694
unsigned int scalersUpdates_
xdata::String configString_
xdata::Boolean * foundRcmsStateListener()
Definition: StateMachine.h:78
std::vector< std::string > const & getmicromap() const
Definition: FWEPWrapper.h:141
xdata::UnsignedInteger32 superSleepSec_
ServiceToken getToken()
unsigned long long allProcStats_
xdata::UnsignedInteger32 lastLumiUsingEol_
Definition: FWEPWrapper.h:222
std::auto_ptr< edm::Presence > messageServicePresence_
#define PIPE_WRITE
Definition: queue_defs.h:41
std::string const & stateNameFromIndex(unsigned int i) const
Definition: FWEPWrapper.h:127
void sendMessageOverMonitorQueue(MsgBuf &)
pthread_mutex_t * mst_lock_
Definition: ModuleWeb.h:49
bool isWaitingForLs()
Definition: FWEPWrapper.h:135
void moduleWeb(xgi::Input *in, xgi::Output *out)
std::string const & configuration() const
Definition: FWEPWrapper.h:102
#define MSQM_MESSAGE_TYPE_STOP
Definition: queue_defs.h:16
void pathNames(xgi::Input *, xgi::Output *)
dictionary args
int stacktrace(void *addresses[], int nmax)
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:82
#define Output(cl)
Definition: vmac.h:193
void procCpuStat(unsigned long long &idleJiffies, unsigned long long &allJiffies)
Definition: procUtils.cc:157
void sendLegenda(const std::vector< std::string > &)
Definition: CPUStat.cc:39
unsigned int ls
Definition: FWEPWrapper.h:43
pthread_mutex_t pickup_lock_
void defaultWebPage(xgi::Input *in, xgi::Output *out)
unsigned int trp
Definition: queue_defs.h:66
toolbox::task::ActionSignature * asSummarize_
toolbox::task::WorkLoop * wlSignalMonitor_
tuple query
Definition: o2o.py:269
xdata::Vector< xdata::Integer > spmStates_
int hasStarted()
Definition: Vulture.cc:215
void sendLegenda(const std::string &)
Definition: RateStat.cc:24
xdata::Integer epMAltState_
Definition: FWEPWrapper.h:196
tuple cout
Definition: gather_cfg.py:121
xdata::UnsignedInteger32 instance_
void serviceWeb(xgi::Input *in, xgi::Output *out)
#define MSQM_MESSAGE_TYPE_MCS
Definition: queue_defs.h:18
void mDiv(std::ostringstream *out, std::string name)
Definition: procUtils.cc:354
edm::EventSummary eventSummary
toolbox::task::WorkLoop * wlReceivingMonitor_
void withdrawLumiSectionIncrement()
Definition: FWEPWrapper.h:138
#define MSQM_MESSAGE_TYPE_WEB
Definition: queue_defs.h:20
void microState(xgi::Input *in, xgi::Output *out)
void sumAndPackTriggerReport(MsgBuf &buf)
toolbox::task::ActionSignature * asScalers_
bool supervisor(toolbox::task::WorkLoop *wl)
TriggerReportStatic * getPackedTriggerReportAsStruct()
Definition: FWEPWrapper.h:115
#define MAX_PIPE_BUFFER_SIZE
Definition: queue_defs.h:42
int totalEventsPassed() const
pthread_mutex_t stop_lock_
void evfep_alarmhandler(int sig, siginfo_t *info, void *c)
unsigned int getScalersUpdates()
Definition: FWEPWrapper.h:136
unsigned int dqm
Definition: queue_defs.h:65
void uptime(std::ostringstream *out)
Definition: procUtils.cc:309
void publishConfigAndMonitorItems(bool)
Definition: FWEPWrapper.cc:112
xdata::InfoSpace * scalersInfoSpace_
unsigned int ms
Definition: queue_defs.h:64
void sendStat(const unsigned char *, size_t, unsigned int)
Definition: RateStat.cc:19
void actionPerformed(xdata::Event &e)
void microState(xgi::Input *in, xgi::Output *out)
void setRcms(xdaq::ApplicationDescriptor *rcms)
Definition: FWEPWrapper.h:89
static const unsigned int logRingSize_
xdata::Boolean epInitialized_
void findRcmsStateListener()
void enableEndPaths(bool active)