CMS 3D CMS Logo

All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
FUEventProcessor.cc
Go to the documentation of this file.
1 //
3 // FUEventProcessor
4 // ----------------
5 //
7 
8 #include "FUEventProcessor.h"
9 #include "procUtils.h"
12 
14 
22 
29 
30 #include "toolbox/BSem.h"
31 #include "toolbox/Runtime.h"
32 #include "toolbox/stacktrace.h"
33 #include "toolbox/net/Utils.h"
34 
35 #include <boost/tokenizer.hpp>
36 
37 #include "xcept/tools.h"
38 #include "xgi/Method.h"
39 
40 #include "cgicc/CgiDefs.h"
41 #include "cgicc/Cgicc.h"
42 #include "cgicc/FormEntry.h"
43 
44 
45 #include <sys/wait.h>
46 #include <sys/utsname.h>
47 #include <sys/mman.h>
48 #include <signal.h>
49 
50 #include <typeinfo>
51 #include <stdlib.h>
52 #include <stdio.h>
53 #include <errno.h>
54 
55 
56 using namespace evf;
57 using namespace cgicc;
58 namespace toolbox {
59  namespace mem {
60  extern toolbox::BSem * _s_mutex_ptr_;
61  }
62 }
63 
64 //signal handler (global)
65 namespace evf {
68  void evfep_sighandler(int sig, siginfo_t* info, void* c)
69  {
71  FUInstancePtr_->handleSignalSlave(sig, info, c);
72  }
73  void evfep_alarmhandler(int sig, siginfo_t* info, void* c)
74  {
75  if (evfep_raised_signal) {
76  signal(evfep_raised_signal,SIG_DFL);
77  raise(evfep_raised_signal);
78  }
79  }
80 }
81 
83 // construction/destruction
85 
86 //______________________________________________________________________________
87 FUEventProcessor::FUEventProcessor(xdaq::ApplicationStub *s)
88  : xdaq::Application(s)
89  , fsm_(this)
90  , log_(getApplicationLogger())
91  , evtProcessor_(log_, getApplicationDescriptor()->getInstance())
92  , runNumber_(0)
93  , epInitialized_(false)
94  , outPut_(true)
95  , autoRestartSlaves_(false)
96  , slaveRestartDelaySecs_(10)
97  , hasShMem_(true)
98  , hasPrescaleService_(true)
99  , hasModuleWebRegistry_(true)
100  , hasServiceWebRegistry_(true)
101  , isRunNumberSetter_(true)
102  , iDieStatisticsGathering_(false)
103  , outprev_(true)
104  , exitOnError_(true)
105  , reasonForFailedState_()
106  , squidnet_(3128,"http://localhost:8000/RELEASE-NOTES.txt")
107  , logRing_(logRingSize_)
108  , logRingIndex_(logRingSize_)
109  , logWrap_(false)
110  , nbSubProcesses_(0)
111  , nbSubProcessesReporting_(0)
112  , forkInEDM_(true)
113  , nblive_(0)
114  , nbdead_(0)
115  , nbTotalDQM_(0)
116  , wlReceiving_(0)
117  , asReceiveMsgAndExecute_(0)
118  , receiving_(false)
119  , wlReceivingMonitor_(0)
120  , asReceiveMsgAndRead_(0)
121  , receivingM_(false)
122  , myProcess_(0)
123  , wlSupervising_(0)
124  , asSupervisor_(0)
125  , supervising_(false)
126  , wlSignalMonitor_(0)
127  , asSignalMonitor_(0)
128  , signalMonitorActive_(false)
129  , monitorInfoSpace_(0)
130  , monitorLegendaInfoSpace_(0)
131  , applicationInfoSpace_(0)
132  , nbProcessed(0)
133  , nbAccepted(0)
134  , scalersInfoSpace_(0)
135  , scalersLegendaInfoSpace_(0)
136  , wlScalers_(0)
137  , asScalers_(0)
138  , wlScalersActive_(false)
139  , scalersUpdates_(0)
140  , wlSummarize_(0)
141  , asSummarize_(0)
142  , wlSummarizeActive_(false)
143  , superSleepSec_(1)
144  , iDieUrl_("none")
145  , vulture_(0)
146  , vp_(0)
147  , cpustat_(0)
148  , ratestat_(0)
149  , mwrRef_(nullptr)
150  , sorRef_(nullptr)
151  , master_message_prg_(0,MSQM_MESSAGE_TYPE_PRG)
152  , master_message_prr_(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_PRR)
153  , slave_message_prr_(sizeof(prg),MSQS_MESSAGE_TYPE_PRR)
154  , master_message_trr_(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_TRR)
155  , edm_init_done_(true)
156  , crashesThisRun_(false)
157  , rlimit_coresize_changed_(false)
158  , crashesToDump_(2)
159  , sigmon_sem_(0)
160  , datasetCounting_(true)
161 {
162  using namespace utils;
163 
164  FUInstancePtr_=this;
165 
166  names_.push_back("nbProcessed" );
167  names_.push_back("nbAccepted" );
168  names_.push_back("epMacroStateInt");
169  names_.push_back("epMicroStateInt");
170  // create pipe for web communication
171  int retpipe = pipe(anonymousPipe_);
172  if(retpipe != 0)
173  LOG4CPLUS_ERROR(getApplicationLogger(),"Failed to create pipe");
174  // check squid presence
176  //pass application parameters to FWEPWrapper
177  evtProcessor_.setAppDesc(getApplicationDescriptor());
178  evtProcessor_.setAppCtxt(getApplicationContext());
179  // bind relevant callbacks to finite state machine
181 
182  //set sourceId_
183  url_ =
184  getApplicationDescriptor()->getContextDescriptor()->getURL()+"/"+
185  getApplicationDescriptor()->getURN();
186  class_ =getApplicationDescriptor()->getClassName();
187  instance_=getApplicationDescriptor()->getInstance();
188  sourceId_=class_.toString()+"_"+instance_.toString();
189  LOG4CPLUS_INFO(getApplicationLogger(),sourceId_ <<" constructor" );
190  LOG4CPLUS_INFO(getApplicationLogger(),"CMSSW_BASE:"<<getenv("CMSSW_BASE"));
191 
192  getApplicationDescriptor()->setAttribute("icon", "/evf/images/epicon.jpg");
193 
194  xdata::InfoSpace *ispace = getApplicationInfoSpace();
195  applicationInfoSpace_ = ispace;
196 
197  // default configuration
198  ispace->fireItemAvailable("parameterSet", &configString_ );
199  ispace->fireItemAvailable("epInitialized", &epInitialized_ );
200  ispace->fireItemAvailable("stateName", fsm_.stateName() );
201  ispace->fireItemAvailable("runNumber", &runNumber_ );
202  ispace->fireItemAvailable("outputEnabled", &outPut_ );
203 
204  ispace->fireItemAvailable("hasSharedMemory", &hasShMem_);
205  ispace->fireItemAvailable("hasPrescaleService", &hasPrescaleService_ );
206  ispace->fireItemAvailable("hasModuleWebRegistry", &hasModuleWebRegistry_ );
207  ispace->fireItemAvailable("hasServiceWebRegistry", &hasServiceWebRegistry_ );
208  ispace->fireItemAvailable("isRunNumberSetter", &isRunNumberSetter_ );
209  ispace->fireItemAvailable("iDieStatisticsGathering", &iDieStatisticsGathering_);
210  ispace->fireItemAvailable("rcmsStateListener", fsm_.rcmsStateListener() );
211  ispace->fireItemAvailable("foundRcmsStateListener",fsm_.foundRcmsStateListener());
212  ispace->fireItemAvailable("nbSubProcesses", &nbSubProcesses_ );
213  ispace->fireItemAvailable("nbSubProcessesReporting",&nbSubProcessesReporting_ );
214  ispace->fireItemAvailable("forkInEDM" ,&forkInEDM_ );
215  ispace->fireItemAvailable("superSleepSec", &superSleepSec_ );
216  ispace->fireItemAvailable("autoRestartSlaves", &autoRestartSlaves_ );
217  ispace->fireItemAvailable("slaveRestartDelaySecs",&slaveRestartDelaySecs_ );
218  ispace->fireItemAvailable("iDieUrl", &iDieUrl_ );
219  ispace->fireItemAvailable("crashesToDump" ,&crashesToDump_ );
220  ispace->fireItemAvailable("datasetCounting" ,&datasetCounting_ );
221 
222  // Add infospace listeners for exporting data values
223  getApplicationInfoSpace()->addItemChangedListener("parameterSet", this);
224  getApplicationInfoSpace()->addItemChangedListener("outputEnabled", this);
225 
226  // findRcmsStateListener
228 
229  // initialize monitoring infospace
230 
231  std::string monInfoSpaceName="evf-eventprocessor-status-monitor";
232  toolbox::net::URN urn = this->createQualifiedInfoSpace(monInfoSpaceName);
233  monitorInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
234 
235  std::string monLegendaInfoSpaceName="evf-eventprocessor-status-legenda";
236  urn = this->createQualifiedInfoSpace(monLegendaInfoSpaceName);
237  monitorLegendaInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
238 
239 
240  monitorInfoSpace_->fireItemAvailable("url", &url_ );
241  monitorInfoSpace_->fireItemAvailable("class", &class_ );
242  monitorInfoSpace_->fireItemAvailable("instance", &instance_ );
243  monitorInfoSpace_->fireItemAvailable("runNumber", &runNumber_ );
244  monitorInfoSpace_->fireItemAvailable("stateName", fsm_.stateName());
245 
246  monitorInfoSpace_->fireItemAvailable("squidPresent", &squidPresent_ );
247 
248  std::string scalersInfoSpaceName="evf-eventprocessor-scalers-monitor";
249  urn = this->createQualifiedInfoSpace(scalersInfoSpaceName);
250  scalersInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
251 
252  std::string scalersLegendaInfoSpaceName="evf-eventprocessor-scalers-legenda";
253  urn = this->createQualifiedInfoSpace(scalersLegendaInfoSpaceName);
254  scalersLegendaInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
255 
256 
257 
259  scalersInfoSpace_->fireItemAvailable("instance", &instance_);
260 
264 
265  //subprocess state vectors for MP
266  monitorInfoSpace_->fireItemAvailable("epMacroStateInt", &spMStates_);
267  monitorInfoSpace_->fireItemAvailable("epMicroStateInt", &spmStates_);
268 
269  // Bind web interface
270  xgi::bind(this, &FUEventProcessor::css, "styles.css");
271  xgi::bind(this, &FUEventProcessor::defaultWebPage, "Default" );
272  xgi::bind(this, &FUEventProcessor::spotlightWebPage, "Spotlight" );
273  xgi::bind(this, &FUEventProcessor::scalersWeb, "scalersWeb");
274  xgi::bind(this, &FUEventProcessor::pathNames, "pathNames" );
275  xgi::bind(this, &FUEventProcessor::subWeb, "SubWeb" );
276  xgi::bind(this, &FUEventProcessor::getSlavePids, "getSlavePids");
277  xgi::bind(this, &FUEventProcessor::moduleWeb, "moduleWeb" );
278  xgi::bind(this, &FUEventProcessor::serviceWeb, "serviceWeb");
279  xgi::bind(this, &FUEventProcessor::microState, "microState");
280  xgi::bind(this, &FUEventProcessor::updater, "updater" );
281  xgi::bind(this, &FUEventProcessor::procStat, "procStat" );
282 
283  // instantiate the plugin manager, not referenced here after!
284 
286 
287  //create Message Service thread and pass ownership to auto_ptr that we destroy before fork
288  try{
289  LOG4CPLUS_DEBUG(getApplicationLogger(),
290  "Trying to create message service presence ");
292  if(pf != 0) {
293  messageServicePresence_= pf->makePresence("MessageServicePresence");
294  }
295  else {
296  LOG4CPLUS_ERROR(getApplicationLogger(),
297  "Unable to create message service presence ");
298  }
299  }
300  catch(...) {
301  LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception");
302  }
304 
305  typedef std::set<xdaq::ApplicationDescriptor*> AppDescSet_t;
306  typedef AppDescSet_t::iterator AppDescIter_t;
307 
308  AppDescSet_t rcms=
309  getApplicationContext()->getDefaultZone()->
310  getApplicationDescriptors("RCMSStateListener");
311  if(rcms.size()==0)
312  {
313  LOG4CPLUS_WARN(getApplicationLogger(),
314  "MonitorReceiver not found, perhaphs it has not been defined ? Scalers updater wl will bail out!");
315  // localLog("-W- MonitorReceiver not found, perhaphs it has not been defined ? Scalers updater wl will bail out!");
316  }
317  else
318  {
319  AppDescIter_t it = rcms.begin();
320  evtProcessor_.setRcms(*it);
321  }
322  pthread_mutex_init(&start_lock_,0);
323  pthread_mutex_init(&stop_lock_,0);
324  pthread_mutex_init(&pickup_lock_,0);
325 
326  forkInfoObj_=nullptr;
327  pthread_mutex_init(&forkObjLock_,0);
328  makeStaticInfo();
330 
331  if(vulture_==0) vulture_ = new Vulture(true);
332 
334 
335  AppDescSet_t setOfiDie=
336  getApplicationContext()->getDefaultZone()->
337  getApplicationDescriptors("evf::iDie");
338 
339  for (AppDescIter_t it=setOfiDie.begin();it!=setOfiDie.end();++it)
340  if ((*it)->getInstance()==0) // there has to be only one instance of iDie
341  iDieUrl_ = (*it)->getContextDescriptor()->getURL() + "/" + (*it)->getURN();
342 
343  //save default core file size
344  getrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
345 
346  //prepare IPC semaphore for getting the workloop waked up on signal caught in slaves
347  #ifdef linux
348  if (sigmon_sem_==0) {
349  sigmon_sem_ = (sem_t*)mmap(NULL, sizeof(sem_t),
350  PROT_READ | PROT_WRITE,
351  MAP_ANONYMOUS | MAP_SHARED, 0, 0);
352  if (!sigmon_sem_) {
353  perror("mmap error\n");
354  std::cout << "mmap error"<<std::endl;
355  }
356  else
357  sem_init(sigmon_sem_,true,0);
358  }
359  #endif
360 }
361 //___________here ends the *huge* constructor___________________________________
362 
363 
364 //______________________________________________________________________________
366 {
367  // no longer needed since the wrapper is a member of the class and one can rely on
368  // implicit destructor - to be revised - at any rate the most common exit path is via "kill"...
369  // if (evtProcessor_) delete evtProcessor_;
370  if(vulture_ != 0) delete vulture_;
371 }
372 
373 
375 // implementation of member functions
377 
378 
379 //______________________________________________________________________________
380 bool FUEventProcessor::configuring(toolbox::task::WorkLoop* wl)
381 {
382 // std::cout << "values " << ((nbSubProcesses_.value_!=0) ? 0x10 : 0) << " "
383 // << ((instance_.value_==0) ? 0x8 : 0) << " "
384 // << (hasServiceWebRegistry_.value_ ? 0x4 : 0) << " "
385 // << (hasModuleWebRegistry_.value_ ? 0x2 : 0) << " "
386 // << (hasPrescaleService_.value_ ? 0x1 : 0) <<std::endl;
387  unsigned short smap
388  = (datasetCounting_.value_ ? 0x20 : 0 )
389  + ((nbSubProcesses_.value_!=0) ? 0x10 : 0)
390  + (((instance_.value_%80)==0) ? 0x8 : 0) // have at least one legend per slice
391  + (hasServiceWebRegistry_.value_ ? 0x4 : 0)
392  + (hasModuleWebRegistry_.value_ ? 0x2 : 0)
393  + (hasPrescaleService_.value_ ? 0x1 : 0);
394  if(nbSubProcesses_.value_==0)
395  {
396  spMStates_.setSize(1);
397  spmStates_.setSize(1);
398  }
399  else
400  {
401  spMStates_.setSize(nbSubProcesses_.value_);
402  spmStates_.setSize(nbSubProcesses_.value_);
403  for(unsigned int i = 0; i < spMStates_.size(); i++)
404  {
406  spmStates_[i] = 0;
407  }
408  }
409  try {
410  LOG4CPLUS_INFO(getApplicationLogger(),"Start configuring ...");
411  std::string cfg = configString_.toString(); evtProcessor_.init(smap,cfg);
412  epInitialized_=true;
413  if(evtProcessor_)
414  {
415  //get ref of mwr
418  // moved to wrapper class
423  if(cpustat_) {delete cpustat_; cpustat_=0;}
425  nbSubProcesses_.value_,
426  instance_.value_,
427  iDieUrl_.value_);
428  if(ratestat_) {delete ratestat_; ratestat_=0;}
429  ratestat_ = new RateStat(iDieUrl_.value_);
430  if(iDieStatisticsGathering_.value_)
431  {
432  try
433  {
435  xdata::Serializable *legenda = scalersLegendaInfoSpace_->find("scalersLegenda");
436  if(legenda !=0)
437  {
438  std::string slegenda = ((xdata::String*)legenda)->value_;
439  ratestat_->sendLegenda(slegenda);
440  }
441  if (sorRef_ && datasetCounting_.value_)
442  {
443  xdata::String dsLegenda = sorRef_->getDatasetCSV();
444  if (dsLegenda.value_.size())
445  ratestat_->sendAuxLegenda(dsLegenda);
446  }
447  }
448  catch(evf::Exception &e)
449  {
450  LOG4CPLUS_INFO(getApplicationLogger(),"coud not send legenda"
451  << e.what());
452  }
453  catch (xcept::Exception& e) {
454  LOG4CPLUS_ERROR(getApplicationLogger(),"Failed to get or send legenda."
455  << e.what());
456  }
457  }
458 
459  fsm_.fireEvent("ConfigureDone",this);
460  LOG4CPLUS_INFO(getApplicationLogger(),"Finished configuring!");
461  localLog("-I- Configuration completed");
462 
463  }
464  }
465  catch (xcept::Exception &e) {
466  reasonForFailedState_ = "configuring FAILED: " + (std::string)e.what();
469  }
470  catch(cms::Exception &e) {
474  }
475  catch(std::exception &e) {
476  reasonForFailedState_ = e.what();
479  }
480  catch(...) {
481  fsm_.fireFailed("Unknown Exception",this);
482  }
483 
484  if(vulture_!=0 && vp_ == 0) vp_ = vulture_->makeProcess();
485 
486  return false;
487 }
488 
489 
490 
491 
492 //______________________________________________________________________________
493 bool FUEventProcessor::enabling(toolbox::task::WorkLoop* wl)
494 {
495  nbTotalDQM_ = 0;
496  scalersUpdates_ = 0;
497  idleProcStats_ = 0;
498  allProcStats_ = 0;
499 // std::cout << "values " << ((nbSubProcesses_.value_!=0) ? 0x10 : 0) << " "
500 // << ((instance_.value_==0) ? 0x8 : 0) << " "
501 // << (hasServiceWebRegistry_.value_ ? 0x4 : 0) << " "
502 // << (hasModuleWebRegistry_.value_ ? 0x2 : 0) << " "
503 // << (hasPrescaleService_.value_ ? 0x1 : 0) <<std::endl;
504  unsigned short smap
505  = (datasetCounting_.value_ ? 0x20 : 0 )
506  + ((nbSubProcesses_.value_!=0) ? 0x10 : 0)
507  + (((instance_.value_%80)==0) ? 0x8 : 0) // have at least one legend per slice
508  + (hasServiceWebRegistry_.value_ ? 0x4 : 0)
509  + (hasModuleWebRegistry_.value_ ? 0x2 : 0)
510  + (hasPrescaleService_.value_ ? 0x1 : 0);
511 
512  LOG4CPLUS_INFO(getApplicationLogger(),"Start enabling...");
513 
514  //reset core limit size
516  setrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
518  crashesThisRun_=0;
519  //recreate signal monitor sem
520  sem_destroy(sigmon_sem_);
521  sem_init(sigmon_sem_,true,0);
522 
523  if(!epInitialized_){
525  }
526  std::string cfg = configString_.toString(); evtProcessor_.init(smap,cfg);
527 
530 
531  if(!epInitialized_){
534  if(cpustat_) {delete cpustat_; cpustat_=0;}
536  nbSubProcesses_.value_,
537  instance_.value_,
538  iDieUrl_.value_);
539  if(ratestat_) {delete ratestat_; ratestat_=0;}
540  ratestat_ = new RateStat(iDieUrl_.value_);
541  if(iDieStatisticsGathering_.value_)
542  {
543  try
544  {
546  xdata::Serializable *legenda = scalersLegendaInfoSpace_->find("scalersLegenda");
547  if(legenda !=0)
548  {
549  std::string slegenda = ((xdata::String*)legenda)->value_;
550  ratestat_->sendLegenda(slegenda);
551  }
552  if (sorRef_ && datasetCounting_.value_)
553  {
554  xdata::String dsLegenda = sorRef_->getDatasetCSV();
555  if (dsLegenda.value_.size())
556  ratestat_->sendAuxLegenda(dsLegenda);
557  }
558  }
559  catch(evf::Exception &e)
560  {
561  LOG4CPLUS_INFO(getApplicationLogger(),"could not send legenda"
562  << e.what());
563  }
564  catch (xcept::Exception& e) {
565  LOG4CPLUS_ERROR(getApplicationLogger(),"Failed to get or send legenda."
566  << e.what());
567  }
568  }
569  epInitialized_ = true;
570  }
571  configuration_ = evtProcessor_.configuration(); // get it again after init has been carried out...
573  //classic appl will return here
574  if(nbSubProcesses_.value_==0) return enableClassic();
575  //protect manipulation of subprocess array
576  pthread_mutex_lock(&start_lock_);
577  pthread_mutex_lock(&pickup_lock_);
578  subs_.clear();
579  subs_.resize(nbSubProcesses_.value_); // this should not be necessary
580  pid_t retval = -1;
581 
582  for(unsigned int i=0; i<nbSubProcesses_.value_; i++)
583  {
584  subs_[i]=SubProcess(i,retval); //this will replace all the scattered variables
585  }
586  pthread_mutex_unlock(&pickup_lock_);
587 
588  pthread_mutex_unlock(&start_lock_);
589 
590  //set expected number of child EP's for the Init message(s) sent to the SM
591  try {
592  if (sorRef_) {
593  unsigned int nbExpectedEPWriters = nbSubProcesses_.value_;
594  if (nbExpectedEPWriters==0) nbExpectedEPWriters=1;//master instance processing
595  std::vector<edm::FUShmOutputModule *> shmOutputs = sorRef_->getShmOutputModules();
596  for (unsigned int i=0;i<shmOutputs.size();i++) {
597  shmOutputs[i]->setNExpectedEPs(nbExpectedEPWriters);
598  }
599  }
600  }
601  catch (...)
602  {
603  LOG4CPLUS_ERROR(getApplicationLogger(),"Thrown Exception while setting nExpectedEPs in shmOutputs");
604  }
605 
606  //use new method if configured
607  edm_init_done_=true;
608  if (forkInEDM_.value_) {
609  edm_init_done_=false;
610  enableForkInEDM();
611  }
612  else
613  for(unsigned int i=0; i<nbSubProcesses_.value_; i++)
614  {
615  retval = subs_[i].forkNew();
616  if(retval==0)
617  {
618  myProcess_ = &subs_[i];
619  // dirty hack: delete/recreate global binary semaphore for later use in child
621  toolbox::mem::_s_mutex_ptr_ = new toolbox::BSem(toolbox::BSem::FULL,true);
622  int retval = pthread_mutex_destroy(&stop_lock_);
623  if(retval != 0) perror("error");
624  retval = pthread_mutex_init(&stop_lock_,0);
625  if(retval != 0) perror("error");
627 
628  return enableMPEPSlave();
629  // the loop is broken in the child
630  }
631  }
632 
633  if (forkInEDM_.value_) {
634 
636  while (!edm_init_done_) {
637  usleep(10000);
638  st = evtProcessor_->getState();
640  }
641  st = evtProcessor_->getState();
642  //error handling: EP must fork during sRunning
644  reasonForFailedState_ = std::string("Master edm::EventProcessor in state ") + evtProcessor_->stateName(st);
647  return false;
648  }
649 
650  sleep(1);
652  startSignalMonitorWorkLoop();//only with new forking
653  vp_ = vulture_->start(iDieUrl_.value_,runNumber_.value_);
654 
655  //enable after we are done with conditions loading and forking
656  LOG4CPLUS_INFO(getApplicationLogger(),"Finished enabling!");
657  fsm_.fireEvent("EnableDone",this);
658  localLog("-I- Start completed");
659  return false;
660  }
661 
663  vp_ = vulture_->start(iDieUrl_.value_,runNumber_.value_);
664  LOG4CPLUS_INFO(getApplicationLogger(),"Finished enabling!");
665  fsm_.fireEvent("EnableDone",this);
666  localLog("-I- Start completed");
667  return false;
668 }
669 
671  //taking care that EP in master is in stoppable state
672  if (forkInfoObj_) {
673 
674  int count = 30;
675  bool waitedForEDM=false;
676  while (!edm_init_done_ && count) {
677  ::sleep(1);
678  if (count%5==0)
679  LOG4CPLUS_WARN(log_,"MASTER EP: Stopping while EP busy in beginRun. waiting " <<count<< "sec");
680  count--;
681  waitedForEDM=true;
682  }
683  //sleep a few more seconds it was early stop
684  if (waitedForEDM) sleep(5);
685 
686  //if (count==0) fsm_.fireFailed("failed to stop Master EP",this);
687 
689  return true;//we are already done
690 
691  forkInfoObj_->lock();
693  sem_post(forkInfoObj_->control_sem_);
694  forkInfoObj_->unlock();
695 
696  count = 30;
697 
699  while(count--) {
700  st = evtProcessor_->getState();
701  if (st==edm::event_processor::sRunning) ::usleep(100000);
703  break;
704  }
705  else {
706  std::ostringstream ost;
707  ost << "Master edm::EventProcessor is in state "<< evtProcessor_->stateName(st) << " while stopping";
708  LOG4CPLUS_ERROR(getApplicationLogger(),ost.str());
709  fsm_.fireFailed(ost.str(),this);
710  return false;
711  }
712  if (count%5==0 && st==edm::event_processor::sRunning && !forkInfoObj_->receivedStop_) {
713  forkInfoObj_->lock();
715  sem_post(forkInfoObj_->control_sem_);
716  forkInfoObj_->unlock();
717  LOG4CPLUS_WARN(getApplicationLogger(),
718  "Master edm::EventProcessor still running after "<< (30-count-1) << " seconds. \"sem_post\" was executed again" );
719  }
720  }
721  if (count<0) {
722  std::ostringstream ost;
724  ost << "Timeout waiting for Master edm::EventProcessor to go stopping state "
725  << evtProcessor_->stateName(st) << ": input source did not receive stop signal!";
726  else
727  ost << "Timeout waiting for Master edm::EventProcessor to go stopping state "<<evtProcessor_->stateName(st);
728  LOG4CPLUS_ERROR(getApplicationLogger(),ost.str());
729  fsm_.fireFailed(ost.str(),this);
730  return false;
731  }
732  }
733  return true;
734 }
735 
736 //______________________________________________________________________________
737 bool FUEventProcessor::stopping(toolbox::task::WorkLoop* wl)
738 {
739  setrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
740  if(nbSubProcesses_.value_!=0) {
742  if (forkInEDM_.value_) {
743  //only in new forking for now
744  sem_post(sigmon_sem_);
745  if (!doEndRunInEDM())
746  return false;
747  }
748  }
749  vulture_->stop();
750 
751  if (forkInEDM_.value_) {
752  //shared memory was already disconnected in master
753  bool tmpHasShMem_=hasShMem_;
754  hasShMem_=false;
755  stopClassic();
756  hasShMem_=tmpHasShMem_;
757  return false;
758  }
759  stopClassic();
760  return false;
761 }
762 
763 
764 //______________________________________________________________________________
765 bool FUEventProcessor::halting(toolbox::task::WorkLoop* wl)
766 {
767  LOG4CPLUS_INFO(getApplicationLogger(),"Start halting ...");
768  setrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
769  if(nbSubProcesses_.value_!=0) {
771  if (forkInEDM_.value_) {
772  sem_post(sigmon_sem_);
773  if (!doEndRunInEDM())
774  return false;
775 
776  }
777  }
778  try{
780  //cleanup forking variables
781  if (forkInfoObj_) {
782  delete forkInfoObj_;
783  forkInfoObj_=0;
784  }
785  }
786  catch (evf::Exception &e) {
787  reasonForFailedState_ = "halting FAILED: " + (std::string)e.what();
790  }
791  // if(hasShMem_) detachDqmFromShm();
792 
793  LOG4CPLUS_INFO(getApplicationLogger(),"Finished halting!");
794  fsm_.fireEvent("HaltDone",this);
795 
796  localLog("-I- Halt completed");
797  return false;
798 }
799 
800 
801 //______________________________________________________________________________
802 xoap::MessageReference FUEventProcessor::fsmCallback(xoap::MessageReference msg)
804 {
805  return fsm_.commandCallback(msg);
806 }
807 
808 
809 //______________________________________________________________________________
811 {
812 
813  if (e.type()=="ItemChangedEvent" && fsm_.stateName()->toString()!="Halted") {
814  std::string item = dynamic_cast<xdata::ItemChangedEvent&>(e).itemName();
815 
816  if ( item == "parameterSet") {
817  LOG4CPLUS_WARN(getApplicationLogger(),
818  "HLT Menu changed, force reinitialization of EventProcessor");
819  epInitialized_ = false;
820  }
821  if ( item == "outputEnabled") {
822  if(outprev_ != outPut_) {
823  LOG4CPLUS_WARN(getApplicationLogger(),
824  (outprev_ ? "Disabling " : "Enabling ")<<"global output");
826  outprev_ = outPut_;
827  }
828  }
829  if (item == "globalInputPrescale") {
830  LOG4CPLUS_WARN(getApplicationLogger(),
831  "Setting global input prescale has no effect "
832  <<"in this version of the code");
833  }
834  if ( item == "globalOutputPrescale") {
835  LOG4CPLUS_WARN(getApplicationLogger(),
836  "Setting global output prescale has no effect "
837  <<"in this version of the code");
838  }
839  }
840 
841 }
842 
843 //______________________________________________________________________________
845 {
846  for (unsigned int i=0;i<subs_.size();i++)
847  {
848  if (i!=0) *out << ",";
849  *out << subs_[i].pid();
850  }
851 }
852 //______________________________________________________________________________
854 {
855  using namespace cgicc;
856  pid_t pid = 0;
857  std::ostringstream ost;
858  ost << "&";
859 
860  Cgicc cgi(in);
862  for(std::map<std::string, std::string, std::less<std::string> >::iterator mit =
863  mycgi->getEnvironment().begin();
864  mit != mycgi->getEnvironment().end(); mit++)
865  ost << mit->first << "%" << mit->second << ";";
866  std::vector<FormEntry> els = cgi.getElements() ;
867  std::vector<FormEntry> el1;
868  cgi.getElement("method",el1);
869  std::vector<FormEntry> el2;
870  cgi.getElement("process",el2);
871  if(el1.size()!=0) {
872  std::string meth = el1[0].getValue();
873  if(el2.size()!=0) {
874  unsigned int i = 0;
875  std::string mod = el2[0].getValue();
876  pid = atoi(mod.c_str()); // get the process id to be polled
877  for(; i < subs_.size(); i++)
878  if(subs_[i].pid()==pid) break;
879  if(i>=subs_.size()){ //process was not found, let the browser know
880  *out << "ERROR 404 : Process " << pid << " Not Found !" << std::endl;
881  return;
882  }
883  if(subs_[i].alive() != 1){
884  *out << "ERROR 405 : Process " << pid << " Not Alive !" << std::endl;
885  return;
886  }
887  MsgBuf msg1(meth.length()+ost.str().length()+1,MSQM_MESSAGE_TYPE_WEB);
888  strncpy(msg1->mtext,meth.c_str(),meth.length());
889  strncpy(msg1->mtext+meth.length(),ost.str().c_str(),ost.str().length());
890  subs_[i].post(msg1,true);
891  unsigned int keep_supersleep_original_value = superSleepSec_.value_;
892  superSleepSec_.value_=10*keep_supersleep_original_value;
894  bool done = false;
895  std::vector<char *>pieces;
896  while(!done){
897  unsigned long retval1 = subs_[i].rcvNonBlocking(msg,true);
898  if(retval1 == MSGQ_MESSAGE_TYPE_RANGE){
899  ::sleep(1);
900  continue;
901  }
902  unsigned int nbytes = atoi(msg->mtext);
903  if(nbytes < MAX_PIPE_BUFFER_SIZE) done = true; // this will break the while loop
904  char *buf= new char[nbytes];
905  ssize_t retval = read(anonymousPipe_[PIPE_READ],buf,nbytes);
906  if(retval<0){
907  std::cout << "Failed to read from pipe." << std::endl;
908  continue;
909  }
910  if(static_cast<unsigned int>(retval) != nbytes) std::cout
911  << "CAREFUL HERE, read less bytes than expected from pipe in subWeb" << std::endl;
912  pieces.push_back(buf);
913  }
914  superSleepSec_.value_=keep_supersleep_original_value;
915  for(unsigned int j = 0; j < pieces.size(); j++){
916  *out<<pieces[j]; // chain the buffers into the output strstream
917  delete[] pieces[j]; //make sure to release all buffers used for reading the pipe
918  }
919  }
920  }
921 }
922 
923 
924 //______________________________________________________________________________
927 {
928 
929 
930  *out << "<!DOCTYPE HTML PUBLIC \"-//W3C//DTD HTML 4.0 Transitional//EN\">"
931  << "<html><head><title>" << getApplicationDescriptor()->getClassName() << (nbSubProcesses_.value_ > 0 ? "MP " : " ")
932  << getApplicationDescriptor()->getInstance() << "</title>"
933  << "<meta http-equiv=\"REFRESH\" content=\"0;url=/evf/html/defaultBasePage.html\">"
934  << "</head></html>";
935 }
936 
937 
938 //______________________________________________________________________________
939 
940 
943 {
944 
945  std::string urn = getApplicationDescriptor()->getURN();
946 
947  *out << "<!-- base href=\"/" << urn
948  << "\"> -->" << std::endl;
949  *out << "<html>" << std::endl;
950  *out << "<head>" << std::endl;
951  *out << "<link type=\"text/css\" rel=\"stylesheet\"";
952  *out << " href=\"/evf/html/styles.css\"/>" << std::endl;
953  *out << "<title>" << getApplicationDescriptor()->getClassName()
954  << getApplicationDescriptor()->getInstance()
955  << " MAIN</title>" << std::endl;
956  *out << "</head>" << std::endl;
957  *out << "<body>" << std::endl;
958  *out << "<table border=\"0\" width=\"100%\">" << std::endl;
959  *out << "<tr>" << std::endl;
960  *out << " <td align=\"left\">" << std::endl;
961  *out << " <img" << std::endl;
962  *out << " align=\"middle\"" << std::endl;
963  *out << " src=\"/evf/images/spoticon.jpg\"" << std::endl;
964  *out << " alt=\"main\"" << std::endl;
965  *out << " width=\"64\"" << std::endl;
966  *out << " height=\"64\"" << std::endl;
967  *out << " border=\"\"/>" << std::endl;
968  *out << " <b>" << std::endl;
969  *out << getApplicationDescriptor()->getClassName()
970  << getApplicationDescriptor()->getInstance() << std::endl;
971  *out << " " << fsm_.stateName()->toString() << std::endl;
972  *out << " </b>" << std::endl;
973  *out << " </td>" << std::endl;
974  *out << " <td width=\"32\">" << std::endl;
975  *out << " <a href=\"/urn:xdaq-application:lid=3\">" << std::endl;
976  *out << " <img" << std::endl;
977  *out << " align=\"middle\"" << std::endl;
978  *out << " src=\"/hyperdaq/images/HyperDAQ.jpg\"" << std::endl;
979  *out << " alt=\"HyperDAQ\"" << std::endl;
980  *out << " width=\"32\"" << std::endl;
981  *out << " height=\"32\"" << std::endl;
982  *out << " border=\"\"/>" << std::endl;
983  *out << " </a>" << std::endl;
984  *out << " </td>" << std::endl;
985  *out << " <td width=\"32\">" << std::endl;
986  *out << " </td>" << std::endl;
987  *out << " <td width=\"32\">" << std::endl;
988  *out << " <a href=\"/" << urn << "/\">" << std::endl;
989  *out << " <img" << std::endl;
990  *out << " align=\"middle\"" << std::endl;
991  *out << " src=\"/evf/images/epicon.jpg\"" << std::endl;
992  *out << " alt=\"main\"" << std::endl;
993  *out << " width=\"32\"" << std::endl;
994  *out << " height=\"32\"" << std::endl;
995  *out << " border=\"\"/>" << std::endl;
996  *out << " </a>" << std::endl;
997  *out << " </td>" << std::endl;
998  *out << "</tr>" << std::endl;
999  *out << "</table>" << std::endl;
1000 
1001  *out << "<hr/>" << std::endl;
1002 
1003  std::ostringstream ost;
1004  if(myProcess_)
1005  ost << "/SubWeb?process=" << getpid() << "&method=moduleWeb&";
1006  else
1007  ost << "/moduleWeb?";
1008  urn += ost.str();
1009  if(evtProcessor_ && (myProcess_ || nbSubProcesses_.value_==0))
1011  else if(evtProcessor_)
1013  else
1014  *out << "<td>HLT Unconfigured</td>" << std::endl;
1015  *out << "</table>" << std::endl;
1016 
1017  *out << "<br><textarea rows=" << 10 << " cols=80 scroll=yes>" << std::endl;
1018  *out << configuration_ << std::endl;
1019  *out << "</textarea><P>" << std::endl;
1020 
1021  *out << "</body>" << std::endl;
1022  *out << "</html>" << std::endl;
1023 
1024 
1025 }
1028 {
1029 
1030  out->getHTTPResponseHeader().addHeader( "Content-Type",
1031  "application/octet-stream" );
1032  out->getHTTPResponseHeader().addHeader( "Content-Transfer-Encoding",
1033  "binary" );
1034  if(evtProcessor_ != 0){
1036  }
1037 }
1038 
1041 {
1042 
1043  if(evtProcessor_ != 0){
1044  xdata::Serializable *legenda = scalersLegendaInfoSpace_->find("scalersLegenda");
1045  if(legenda !=0){
1046  std::string slegenda = ((xdata::String*)legenda)->value_;
1047  *out << slegenda << std::endl;
1048  }
1049  }
1050 }
1051 
1052 
1054 {
1055  std::string errmsg;
1056  try {
1059  edm::Service<FUShmDQMOutputService>()->setAttachToShm();
1060  }
1061  catch (cms::Exception& e) {
1062  errmsg = "Failed to set to attach DQM service to shared memory: " + (std::string)e.what();
1063  }
1064  if (!errmsg.empty()) XCEPT_RAISE(evf::Exception,errmsg);
1065 }
1066 
1067 
1069 {
1070  std::string errmsg;
1071  bool success = false;
1072  try {
1075  success = edm::Service<FUShmDQMOutputService>()->attachToShm();
1076  if (!success) errmsg = "Failed to attach DQM service to shared memory";
1077  }
1078  catch (cms::Exception& e) {
1079  errmsg = "Failed to attach DQM service to shared memory: " + (std::string)e.what();
1080  }
1081  if (!errmsg.empty()) XCEPT_RAISE(evf::Exception,errmsg);
1082 }
1083 
1084 
1085 
1087 {
1088  std::string errmsg;
1089  bool success = false;
1090  try {
1093  success = edm::Service<FUShmDQMOutputService>()->detachFromShm();
1094  if (!success) errmsg = "Failed to detach DQM service from shared memory";
1095  }
1096  catch (cms::Exception& e) {
1097  errmsg = "Failed to detach DQM service from shared memory: " + (std::string)e.what();
1098  }
1099  if (!errmsg.empty()) XCEPT_RAISE(evf::Exception,errmsg);
1100 }
1101 
1102 
1104 {
1105  std::ostringstream oss;
1106  if(logWrap_)
1107  {
1108  for(unsigned int i = logRingIndex_; i < logRing_.size(); i++)
1109  oss << logRing_[i] << std::endl;
1110  for(unsigned int i = 0; i < logRingIndex_; i++)
1111  oss << logRing_[i] << std::endl;
1112  }
1113  else
1114  for(unsigned int i = logRingIndex_; i < logRing_.size(); i++)
1115  oss << logRing_[i] << std::endl;
1116 
1117  return oss.str();
1118 }
1119 
1121 {
1122  timeval tv;
1123 
1124  gettimeofday(&tv,0);
1125  tm *uptm = localtime(&tv.tv_sec);
1126  char datestring[256];
1127  strftime(datestring, sizeof(datestring),"%c", uptm);
1128 
1129  if(logRingIndex_ == 0){logWrap_ = true; logRingIndex_ = logRingSize_;}
1130  logRingIndex_--;
1131  std::ostringstream timestamp;
1132  timestamp << " at " << datestring;
1133  m += timestamp.str();
1135 }
1136 
1138 {
1139  try {
1141  toolbox::task::getWorkLoopFactory()->getWorkLoop("Supervisor",
1142  "waiting");
1143  if (!wlSupervising_->isActive()) wlSupervising_->activate();
1144  asSupervisor_ = toolbox::task::bind(this,&FUEventProcessor::supervisor,
1145  "Supervisor");
1146  wlSupervising_->submit(asSupervisor_);
1147  supervising_ = true;
1148  }
1149  catch (xcept::Exception& e) {
1150  std::string msg = "Failed to start workloop 'Supervisor'.";
1151  XCEPT_RETHROW(evf::Exception,msg,e);
1152  }
1153 }
1154 
1156 {
1157  try {
1158  wlReceiving_=
1159  toolbox::task::getWorkLoopFactory()->getWorkLoop("Receiving",
1160  "waiting");
1161  if (!wlReceiving_->isActive()) wlReceiving_->activate();
1162  asReceiveMsgAndExecute_ = toolbox::task::bind(this,&FUEventProcessor::receiving,
1163  "Receiving");
1165  receiving_ = true;
1166  }
1167  catch (xcept::Exception& e) {
1168  std::string msg = "Failed to start workloop 'Receiving'.";
1169  XCEPT_RETHROW(evf::Exception,msg,e);
1170  }
1171 }
1173 {
1174  try {
1176  toolbox::task::getWorkLoopFactory()->getWorkLoop("ReceivingM",
1177  "waiting");
1178  if (!wlReceivingMonitor_->isActive())
1179  wlReceivingMonitor_->activate();
1181  toolbox::task::bind(this,&FUEventProcessor::receivingAndMonitor,
1182  "ReceivingM");
1184  receivingM_ = true;
1185  }
1186  catch (xcept::Exception& e) {
1187  std::string msg = "Failed to start workloop 'ReceivingM'.";
1188  XCEPT_RETHROW(evf::Exception,msg,e);
1189  }
1190 }
1191 
1192 bool FUEventProcessor::receiving(toolbox::task::WorkLoop *)
1193 {
1194  MsgBuf msg;
1195  try{
1196  myProcess_->rcvSlave(msg,false); //will receive only messages from Master
1197  if(msg->mtype==MSQM_MESSAGE_TYPE_RLI)
1198  {
1199  rlimit rl;
1200  getrlimit(RLIMIT_CORE,&rl);
1201  rl.rlim_cur=0;
1202  setrlimit(RLIMIT_CORE,&rl);
1204  }
1205  if (msg->mtype==MSQM_MESSAGE_TYPE_RLR)
1206  {
1207  //reset coresize limit
1208  setrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
1210  }
1211  if(msg->mtype==MSQM_MESSAGE_TYPE_STOP)
1212  {
1213  pthread_mutex_lock(&stop_lock_);
1214  try {
1215  fsm_.fireEvent("Stop",this); // need to set state in fsm first to allow stopDone transition
1216  }
1217  catch (...) {
1218  LOG4CPLUS_ERROR(getApplicationLogger(),"Failed to go to Stopping state in slave EP, pid "
1219  << getpid() << " The state on Stop event was not consistent");
1220  }
1221 
1222  try {
1223  stopClassic(); // call the normal sequence of stopping - as this is allowed to fail provisions must be made ...@@@EM
1224  }
1225  catch (...) {
1226  LOG4CPLUS_ERROR(getApplicationLogger(),"Slave EP 'receiving' workloop: exception " << getpid());
1227  }
1228 
1229  //destroy MessageService thread before exit
1230  try{
1231  messageServicePresence_.reset();
1232  }
1233  catch(...) {
1234  LOG4CPLUS_ERROR(getApplicationLogger(),"SLAVE:Unable to destroy MessageServicePresence. pid:" << getpid() );
1235  }
1236 
1238  myProcess_->postSlave(msg1,false);
1239  pthread_mutex_unlock(&stop_lock_);
1240  fclose(stdout);
1241  fclose(stderr);
1242  _exit(EXIT_SUCCESS);
1243  }
1244  if(msg->mtype==MSQM_MESSAGE_TYPE_FSTOP)
1245  _exit(EXIT_SUCCESS);
1246  }
1247  catch(evf::Exception &e){
1248  LOG4CPLUS_ERROR(getApplicationLogger(),"Slave EP pid:" << getpid() << " receiving WorkLoop exception: "<<e.what());
1249  }
1250  return true;
1251 }
1252 
1253 bool FUEventProcessor::supervisor(toolbox::task::WorkLoop *)
1254 {
1255  pthread_mutex_lock(&stop_lock_);
1256  if(subs_.size()!=nbSubProcesses_.value_)
1257  {
1258  pthread_mutex_lock(&pickup_lock_);
1259  if(subs_.size()!=nbSubProcesses_.value_) {
1260  subs_.resize(nbSubProcesses_.value_);
1261  spMStates_.resize(nbSubProcesses_.value_);
1262  spmStates_.resize(nbSubProcesses_.value_);
1263  for(unsigned int i = 0; i < spMStates_.size(); i++)
1264  {
1266  spmStates_[i] = 0;
1267  }
1268  }
1269  pthread_mutex_unlock(&pickup_lock_);
1270  }
1271  bool running = fsm_.stateName()->toString()=="Enabled";
1272  bool stopping = fsm_.stateName()->toString()=="stopping";
1273  for(unsigned int i = 0; i < subs_.size(); i++)
1274  {
1275  if(subs_[i].alive()==-1000) continue;
1276  int sl;
1277  pid_t sub_pid = subs_[i].pid();
1278  pid_t killedOrNot = waitpid(sub_pid,&sl,WNOHANG);
1279 
1280  if(killedOrNot && killedOrNot==sub_pid) {
1281  pthread_mutex_lock(&pickup_lock_);
1282  //check if out of range or recreated (enable can clear vector)
1283  if (i<subs_.size() && subs_[i].alive()!=-1000) {
1284  subs_[i].setStatus((WIFEXITED(sl) != 0 ? 0 : -1));
1285  std::ostringstream ost;
1286  if(subs_[i].alive()==0) ost << " process exited with status " << WEXITSTATUS(sl);
1287  else if(WIFSIGNALED(sl)!=0) {
1288  ost << " process terminated with signal " << WTERMSIG(sl);
1289  }
1290  else ost << " process stopped ";
1291  //report unexpected slave exit in stop
1292  //if (stopping && (WEXITSTATUS(sl)!=0 || WIFSIGNALED(sl)!=0)) {
1293  // LOG4CPLUS_WARN(getApplicationLogger(),ost.str() << ", slave pid:"<<getpid());
1294  //}
1295  subs_[i].countdown()=slaveRestartDelaySecs_.value_;
1296  subs_[i].setReasonForFailed(ost.str());
1298  spmStates_[i] = 0;
1299  std::ostringstream ost1;
1300  ost1 << "-E- Slave " << subs_[i].pid() << ost.str();
1301  localLog(ost1.str());
1302  if(!autoRestartSlaves_.value_) subs_[i].disconnect();
1303  }
1304  pthread_mutex_unlock(&pickup_lock_);
1305  }
1306  }
1307  pthread_mutex_unlock(&stop_lock_);
1308  if(stopping) return true; // if in stopping we are done
1309 
1310  // check if we need to reset core dumps (15 min after last one)
1311  if (running && rlimit_coresize_changed_) {
1312  timeval newtv;
1313  gettimeofday(&newtv,0);
1314  int delta = newtv.tv_sec-lastCrashTime_.tv_sec;
1315  if (delta>60*15) {
1316  std::ostringstream ostr;
1317  ostr << " No more slave EP crashes on this machine in last 15 min. resetting core size limits";
1318  std::cout << ostr.str() << std::endl;
1319  LOG4CPLUS_INFO(getApplicationLogger(),ostr.str());
1320  setrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
1321  MsgBuf master_message_rlr_(NUMERIC_MESSAGE_SIZE,MSQM_MESSAGE_TYPE_RLR);
1322  for (unsigned int i = 0; i < subs_.size(); i++) {
1323  try {
1324  if (subs_[i].alive())
1325  subs_[i].post(master_message_rlr_,false);
1326  }
1327  catch (...) {}
1328  }
1330  crashesThisRun_=0;
1331  }
1332  }
1333 
1334  if(running && edm_init_done_)
1335  {
1336  // if enabled, this loop will periodically check if dead slaves countdown has expired and restart them
1337  // this is only active while running, hence, the stop lock is acquired and only released at end of loop
1338  if(autoRestartSlaves_.value_){
1339  pthread_mutex_lock(&stop_lock_); //lockout slave killing at stop while you check for restarts
1340  for(unsigned int i = 0; i < subs_.size(); i++)
1341  {
1342  if(subs_[i].alive() != 1){
1343  if(subs_[i].countdown() == 0)
1344  {
1345  if(subs_[i].restartCount()>2){
1346  LOG4CPLUS_WARN(getApplicationLogger()," Not restarting subprocess in slot " << i
1347  << " - maximum restart count reached");
1348  std::ostringstream ost1;
1349  ost1 << "-W- Dead Process in slot " << i << " reached maximum restart count";
1350  localLog(ost1.str());
1351  subs_[i].countdown()--;
1352  XCEPT_DECLARE(evf::Exception,
1353  sentinelException, ost1.str());
1354  notifyQualified("error",sentinelException);
1355  subs_[i].disconnect();
1356  continue;
1357  }
1358  subs_[i].restartCount()++;
1359  if (forkInEDM_.value_) {
1361  }
1362  else {
1363  pid_t rr = subs_[i].forkNew();
1364  if(rr==0)
1365  {
1366  myProcess_=&subs_[i];
1367  scalersUpdates_ = 0;
1368  int retval = pthread_mutex_destroy(&stop_lock_);
1369  if(retval != 0) perror("error");
1370  retval = pthread_mutex_init(&stop_lock_,0);
1371  if(retval != 0) perror("error");
1373  fsm_.fireEvent("Stop",this); // need to set state in fsm first to allow stopDone transition
1374  fsm_.fireEvent("StopDone",this); // need to set state in fsm first to allow stopDone transition
1375  fsm_.fireEvent("Enable",this); // need to set state in fsm first to allow stopDone transition
1376  try{
1377  xdata::Serializable *lsid = applicationInfoSpace_->find("lumiSectionIndex");
1378  if(lsid) {
1379  ((xdata::UnsignedInteger32*)(lsid))->value_--; // need to reset to value before end of ls in which process died
1380  }
1381  }
1382  catch(...){
1383  std::cout << "trouble with lsindex during restart" << std::endl;
1384  }
1385  try{
1386  xdata::Serializable *lstb = applicationInfoSpace_->find("lsToBeRecovered");
1387  if(lstb) {
1388  ((xdata::Boolean*)(lstb))->value_ = false; // do not issue eol/bol for all Ls when restarting
1389  }
1390  }
1391  catch(...){
1392  std::cout << "trouble with resetting flag for eol recovery " << std::endl;
1393  }
1394 
1397  enableMPEPSlave();
1398  return false; // exit the supervisor loop immediately in the child !!!
1399  }
1400  else
1401  {
1402  std::ostringstream ost1;
1403  ost1 << "-I- New Process " << rr << " forked for slot " << i;
1404  localLog(ost1.str());
1405  }
1406  }
1407  }
1408  if(subs_[i].countdown()>=0) subs_[i].countdown()--;
1409  }
1410  }
1411  pthread_mutex_unlock(&stop_lock_);
1412  } // finished handling replacement of dead slaves once they've been reaped
1413  }
1414  xdata::Serializable *lsid = 0;
1415  xdata::Serializable *psid = 0;
1416  xdata::Serializable *dqmp = 0;
1417  xdata::UnsignedInteger32 *dqm = 0;
1418 
1419 
1420 
1421  if(running && edm_init_done_){
1422  try{
1423  lsid = applicationInfoSpace_->find("lumiSectionIndex");
1424  psid = applicationInfoSpace_->find("prescaleSetIndex");
1425  nbProcessed = monitorInfoSpace_->find("nbProcessed");
1426  nbAccepted = monitorInfoSpace_->find("nbAccepted");
1427  dqmp = applicationInfoSpace_-> find("nbDqmUpdates");
1428  }
1430  LOG4CPLUS_INFO(getApplicationLogger(),"could not retrieve some data - " << e.what());
1431  }
1432 
1433  try{
1434  if(nbProcessed !=0 && nbAccepted !=0)
1435  {
1436  xdata::UnsignedInteger32*nbp = ((xdata::UnsignedInteger32*)nbProcessed);
1437  xdata::UnsignedInteger32*nba = ((xdata::UnsignedInteger32*)nbAccepted);
1438  xdata::UnsignedInteger32*ls = ((xdata::UnsignedInteger32*)lsid);
1439  xdata::UnsignedInteger32*ps = ((xdata::UnsignedInteger32*)psid);
1440  if(dqmp!=0)
1441  dqm = (xdata::UnsignedInteger32*)dqmp;
1442  if(dqm) dqm->value_ = 0;
1443  nbTotalDQM_ = 0;
1444  nbp->value_ = 0;
1445  nba->value_ = 0;
1446  nblive_ = 0;
1447  nbdead_ = 0;
1448  scalersUpdates_ = 0;
1449 
1450  for(unsigned int i = 0; i < subs_.size(); i++)
1451  {
1452  if(subs_[i].alive()>0)
1453  {
1454  nblive_++;
1455  try{
1456  subs_[i].post(master_message_prg_,true);
1457 
1458  unsigned long retval = subs_[i].rcvNonBlocking(master_message_prr_,true);
1459  if(retval == (unsigned long) master_message_prr_->mtype){
1460  prg* p = (struct prg*)(master_message_prr_->mtext);
1461  subs_[i].setParams(p);
1462  spMStates_[i] = p->Ms;
1463  spmStates_[i] = p->ms;
1464  cpustat_->addEntry(p->ms);
1465  if(!subs_[i].inInconsistentState() &&
1469  {
1470  std::ostringstream ost;
1471  ost << "edm::eventprocessor slot " << i << " process id "
1472  << subs_[i].pid() << " not in Running state : Mstate="
1473  << evtProcessor_.stateNameFromIndex(p->Ms) << " mstate="
1475  << " - Look into possible error messages from HLT process";
1476  LOG4CPLUS_WARN(getApplicationLogger(),ost.str());
1477  }
1478  nbp->value_ += subs_[i].params().nbp;
1479  nba->value_ += subs_[i].params().nba;
1480  if(dqm)dqm->value_ += p->dqm;
1481  nbTotalDQM_ += p->dqm;
1482  scalersUpdates_ += p->trp;
1483  if(p->ls > ls->value_) ls->value_ = p->ls;
1484  if(p->ps != ps->value_) ps->value_ = p->ps;
1485  }
1486  else{
1487  nbp->value_ += subs_[i].get_save_nbp();
1488  nba->value_ += subs_[i].get_save_nba();
1489  }
1490  }
1491  catch(evf::Exception &e){
1492  LOG4CPLUS_INFO(getApplicationLogger(),
1493  "could not send/receive msg on slot "
1494  << i << " - " << e.what());
1495  }
1496 
1497  }
1498  else
1499  {
1500  nbp->value_ += subs_[i].get_save_nbp();
1501  nba->value_ += subs_[i].get_save_nba();
1502  nbdead_++;
1503  }
1504  }
1505  if(nbp->value_>64){//have some slaves already processed more than one event ? (eventually make this == number of raw cells)
1506  for(unsigned int i = 0; i < subs_.size(); i++)
1507  {
1508  if(subs_[i].params().nbp == 0){ // a slave has processed 0 events
1509  // check that the process is not stuck
1510  if(subs_[i].alive()>0 && subs_[i].params().ms == 0) // the process is seen alive but in us=Invalid(0)
1511  {
1512  subs_[i].found_invalid();//increase the "found_invalid" counter
1513  if(subs_[i].nfound_invalid() > 60){ //wait x monitor cycles (~1 min a good time ?) before doing something about a stuck slave
1514  MsgBuf msg3(NUMERIC_MESSAGE_SIZE,MSQM_MESSAGE_TYPE_FSTOP); // send a force-stop signal
1515  subs_[i].post(msg3,false);
1516  std::ostringstream ost1;
1517  ost1 << "-W- Process in slot " << i << " Never reached the running state - forcestopping it";
1518  localLog(ost1.str());
1519  LOG4CPLUS_ERROR(getApplicationLogger(),ost1.str());
1520  XCEPT_DECLARE(evf::Exception,
1521  sentinelException, ost1.str());
1522  notifyQualified("error",sentinelException);
1523 
1524  }
1525  }
1526  }
1527  }
1528  }
1529  }
1530  }
1531  catch(std::exception &e){
1532  LOG4CPLUS_INFO(getApplicationLogger(),"std exception - " << e.what());
1533  }
1534  catch(...){
1535  LOG4CPLUS_INFO(getApplicationLogger(),"unknown exception ");
1536  }
1537  }
1538  else{
1539  for(unsigned int i = 0; i < subs_.size(); i++)
1540  {
1541  if(subs_[i].alive()==-1000)
1542  {
1544  spmStates_[i] = 0;
1545  }
1546  }
1547  }
1548  try{
1549  monitorInfoSpace_->lock();
1550  monitorInfoSpace_->fireItemGroupChanged(names_,0);
1551  monitorInfoSpace_->unlock();
1552  }
1554  {
1555  LOG4CPLUS_ERROR(log_, "Exception from fireItemGroupChanged: " << e.what());
1556  // localLog(e.what());
1557  }
1558  ::sleep(superSleepSec_.value_);
1559  return true;
1560 }
1561 
1563 {
1564  try {
1565  wlScalers_=
1566  toolbox::task::getWorkLoopFactory()->getWorkLoop("Scalers",
1567  "waiting");
1568  if (!wlScalers_->isActive()) wlScalers_->activate();
1569  asScalers_ = toolbox::task::bind(this,&FUEventProcessor::scalers,
1570  "Scalers");
1571 
1572  wlScalers_->submit(asScalers_);
1573  wlScalersActive_ = true;
1574  }
1575  catch (xcept::Exception& e) {
1576  std::string msg = "Failed to start workloop 'Scalers'.";
1577  XCEPT_RETHROW(evf::Exception,msg,e);
1578  }
1579 }
1580 
1581 //______________________________________________________________________________
1582 
1584 {
1585  try {
1586  wlSummarize_=
1587  toolbox::task::getWorkLoopFactory()->getWorkLoop("Summary",
1588  "waiting");
1589  if (!wlSummarize_->isActive()) wlSummarize_->activate();
1590 
1591  asSummarize_ = toolbox::task::bind(this,&FUEventProcessor::summarize,
1592  "Summary");
1593 
1594  wlSummarize_->submit(asSummarize_);
1595  wlSummarizeActive_ = true;
1596  }
1597  catch (xcept::Exception& e) {
1598  std::string msg = "Failed to start workloop 'Summarize'.";
1599  XCEPT_RETHROW(evf::Exception,msg,e);
1600  }
1601 }
1602 
1603 //______________________________________________________________________________
1604 
1605 bool FUEventProcessor::scalers(toolbox::task::WorkLoop* wl)
1606 {
1607  if(evtProcessor_)
1608  {
1609  if(!evtProcessor_.getTriggerReport(true)) {
1610  wlScalersActive_ = false;
1611  return false;
1612  }
1613  }
1614  else
1615  {
1616  std::cout << getpid()<< " Scalers workloop, bailing out, no evtProcessor " << std::endl;
1617  wlScalersActive_ = false;
1618  return false;
1619  }
1620  if(myProcess_)
1621  {
1622  // std::cout << getpid() << "going to post on control queue from scalers" << std::endl;
1624  if(ret!=0) std::cout << "scalers workloop, error posting to sqs_ " << errno << std::endl;
1625  scalersUpdates_++;
1626  }
1627  else
1629  return true;
1630 }
1631 
1632 //______________________________________________________________________________
1633 bool FUEventProcessor::summarize(toolbox::task::WorkLoop* wl)
1634 {
1636  bool atLeastOneProcessUpdatedSuccessfully = false;
1637  int msgCount = 0;
1638  for (unsigned int i = 0; i < subs_.size(); i++)
1639  {
1640  if(subs_[i].alive()>0)
1641  {
1642  int ret = 0;
1643  if(subs_[i].check_postponed_trigger_update(master_message_trr_,
1645  {
1646  ret = MSQS_MESSAGE_TYPE_TRR;
1647  std::cout << "using postponed report from slot " << i << " for ls " << evtProcessor_.getLumiSectionReferenceIndex() << std::endl;
1648  }
1649  else{
1650  bool insync = false;
1651  bool exception_caught = false;
1652  while(!insync){
1653  try{
1654  ret = subs_[i].rcv(master_message_trr_,false);
1655  }
1656  catch(evf::Exception &e)
1657  {
1658  std::cout << "exception in msgrcv on " << i
1659  << " " << subs_[i].alive() << " " << strerror(errno) << std::endl;
1660  exception_caught = true;
1661  break;
1662  //do nothing special
1663  }
1664  if(ret==MSQS_MESSAGE_TYPE_TRR) {
1667  insync = true;
1668  }
1669  }
1670  }
1671  if(exception_caught) continue;
1672  }
1673  msgCount++;
1674  if(ret==MSQS_MESSAGE_TYPE_TRR) {
1677  std::cout << "postpone handling of msg from slot " << i << " with Ls " << trp->lumiSection
1678  << " should be " << evtProcessor_.getLumiSectionReferenceIndex() << std::endl;
1679  subs_[i].add_postponed_trigger_update(master_message_trr_);
1680  }else{
1681  atLeastOneProcessUpdatedSuccessfully = true;
1683  }
1684  }
1685  else std::cout << "msgrcv returned error " << errno << std::endl;
1686  }
1687  }
1688  if(atLeastOneProcessUpdatedSuccessfully){
1689  nbSubProcessesReporting_.value_ = msgCount;
1694  }
1695  else{
1696  LOG4CPLUS_WARN(getApplicationLogger(),"Summarize loop: no process updated successfully - sleep 10 seconds before trying again");
1697  if(msgCount==0) evtProcessor_.withdrawLumiSectionIncrement();
1698  nbSubProcessesReporting_.value_ = 0;
1699  ::sleep(10);
1700  }
1701  if(fsm_.stateName()->toString()!="Enabled"){
1702  wlScalersActive_ = false;
1703  return false;
1704  }
1705  // cpustat_->printStat();
1706  if(iDieStatisticsGathering_.value_){
1707  try{
1708  unsigned long long idleTmp=idleProcStats_;
1709  unsigned long long allPSTmp=allProcStats_;
1711 
1713  timeval oldtime=lastProcReport_;
1714  gettimeofday(&lastProcReport_,0);
1715 
1716  if (allPSTmp!=0 && idleTmp!=0 && allProcStats_!=allPSTmp) {
1717  cpustat_->setCPUStat(1000 - ((idleProcStats_-idleTmp)*1000)/(allProcStats_-allPSTmp));
1718  int deltaTms=1000 * (lastProcReport_.tv_sec-oldtime.tv_sec)
1719  + (lastProcReport_.tv_usec-oldtime.tv_usec)/1000;
1720  cpustat_->setElapsed(deltaTms);
1721  }
1722  else {
1723  cpustat_->setCPUStat(0);
1724  cpustat_->setElapsed(0);
1725  }
1726 
1730  ratestat_->sendStat((unsigned char*)trsp,
1731  sizeof(TriggerReportStatic),
1733  }catch(evf::Exception &e){
1734  LOG4CPLUS_INFO(getApplicationLogger(),"coud not send statistics"
1735  << e.what());
1736  }
1737  }
1738  cpustat_->reset();
1739  return true;
1740 }
1741 
1742 
1743 
1744 bool FUEventProcessor::receivingAndMonitor(toolbox::task::WorkLoop *)
1745 {
1746  try{
1747  myProcess_->rcvSlave(slave_message_monitoring_,true); //will receive only messages from Master
1748  switch(slave_message_monitoring_->mtype)
1749  {
1750  case MSQM_MESSAGE_TYPE_MCS:
1751  {
1752  xgi::Input *in = 0;
1753  xgi::Output out;
1754  evtProcessor_.microState(in,&out);
1755  MsgBuf msg1(out.str().size(),MSQS_MESSAGE_TYPE_MCR);
1756  strncpy(msg1->mtext,out.str().c_str(),out.str().size());
1757  myProcess_->postSlave(msg1,true);
1758  break;
1759  }
1760 
1761  case MSQM_MESSAGE_TYPE_PRG:
1762  {
1763  xdata::Serializable *dqmp = 0;
1764  xdata::UnsignedInteger32 *dqm = 0;
1766  try{
1767  dqmp = applicationInfoSpace_-> find("nbDqmUpdates");
1768  } catch(xdata::exception::Exception e){}
1769  if(dqmp!=0)
1770  dqm = (xdata::UnsignedInteger32*)dqmp;
1771 
1772  // monitorInfoSpace_->lock();
1773  prg * data = (prg*)slave_message_prr_->mtext;
1774  data->ls = evtProcessor_.lsid_;
1776  data->ps = evtProcessor_.psid_;
1777  data->nbp = evtProcessor_->totalEvents();
1778  data->nba = evtProcessor_->totalEventsPassed();
1779  data->Ms = evtProcessor_.epMAltState_.value_;
1780  data->ms = evtProcessor_.epmAltState_.value_;
1781  if(dqm) data->dqm = dqm->value_; else data->dqm = 0;
1782  data->trp = scalersUpdates_;
1783  // monitorInfoSpace_->unlock();
1785  if(exitOnError_.value_)
1786  {
1787  // after each monitoring cycle check if we are in inconsistent state and exit if configured to do so
1788  // std::cout << getpid() << "receivingAndMonitor: trying to acquire stop lock " << std::endl;
1789  if(data->Ms == edm::event_processor::sError)
1790  {
1791  bool running = true;
1792  int count = 0;
1793  while(running){
1794  int retval = pthread_mutex_lock(&stop_lock_);
1795  if(retval != 0) perror("error");
1796  running = fsm_.stateName()->toString()=="Enabled";
1797  if(count>5) _exit(-1);
1798  pthread_mutex_unlock(&stop_lock_);
1799  if(running) {::sleep(1); count++;}
1800  }
1801  }
1802  }
1803  break;
1804  }
1805  case MSQM_MESSAGE_TYPE_WEB:
1806  {
1807  xgi::Input *in = 0;
1808  xgi::Output out;
1809  unsigned int bytesToSend = 0;
1812  size_t pos = query.find_first_of("&");
1814  std::string args;
1815  if(pos!=std::string::npos)
1816  {
1817  method = query.substr(0,pos);
1818  args = query.substr(pos+1,query.length()-pos-1);
1819  }
1820  else
1821  method=query;
1822 
1823  if(method=="Spotlight")
1824  {
1825  spotlightWebPage(in,&out);
1826  }
1827  else if(method=="procStat")
1828  {
1829  procStat(in,&out);
1830  }
1831  else if(method=="moduleWeb")
1832  {
1833  internal::MyCgi mycgi;
1834  boost::char_separator<char> sep(";");
1835  boost::tokenizer<boost::char_separator<char> > tokens(args, sep);
1836  for (boost::tokenizer<boost::char_separator<char> >::iterator tok_iter = tokens.begin();
1837  tok_iter != tokens.end(); ++tok_iter){
1838  size_t pos = (*tok_iter).find_first_of("%");
1839  if(pos != std::string::npos){
1840  std::string first = (*tok_iter).substr(0 , pos);
1841  std::string second = (*tok_iter).substr(pos+1, (*tok_iter).length()-pos-1);
1842  mycgi.getEnvironment()[first]=second;
1843  }
1844  }
1845  moduleWeb(&mycgi,&out);
1846  }
1847  else if(method=="Default")
1848  {
1849  defaultWebPage(in,&out);
1850  }
1851  else
1852  {
1853  out << "Error 404!!!!!!!!" << std::endl;
1854  }
1855 
1856 
1857  bytesToSend = out.str().size();
1858  unsigned int cycle = 0;
1859  if(bytesToSend==0)
1860  {
1861  snprintf(msg1->mtext, NUMERIC_MESSAGE_SIZE, "%d", bytesToSend);
1862  myProcess_->postSlave(msg1,true);
1863  }
1864  while(bytesToSend !=0){
1865  unsigned int msgSize = bytesToSend>MAX_PIPE_BUFFER_SIZE ? MAX_PIPE_BUFFER_SIZE : bytesToSend;
1867  out.str().c_str()+MAX_PIPE_BUFFER_SIZE*cycle,
1868  msgSize);
1869  snprintf(msg1->mtext, NUMERIC_MESSAGE_SIZE, "%d", msgSize);
1870  myProcess_->postSlave(msg1,true);
1871  bytesToSend -= msgSize;
1872  cycle++;
1873  }
1874  break;
1875  }
1876  case MSQM_MESSAGE_TYPE_TRP:
1877  {
1878  break;
1879  }
1880  }
1881  }
1882  catch(evf::Exception &e){std::cout << "exception caught in recevingM: " << e.what() << std::endl;}
1883  return true;
1884 }
1885 
1887 {
1888  //todo rewind/check semaphore
1889  //start workloop
1890  try {
1892  toolbox::task::getWorkLoopFactory()->getWorkLoop("SignalMonitor",
1893  "waiting");
1894 
1895  if (!wlSignalMonitor_->isActive()) wlSignalMonitor_->activate();
1896  asSignalMonitor_ = toolbox::task::bind(this,&FUEventProcessor::sigmon,
1897  "SignalMonitor");
1899  signalMonitorActive_ = true;
1900  }
1901  catch (xcept::Exception& e) {
1902  std::string msg = "Failed to start workloop 'SignalMonitor'. (3)";
1903  std::cout << e.what() << std::endl;
1904  XCEPT_RETHROW(evf::Exception,msg,e);
1905  }
1906 }
1907 
1908 
1909 bool FUEventProcessor::sigmon(toolbox::task::WorkLoop* wl)
1910 {
1911  while (1) {
1912  sem_wait(sigmon_sem_);
1913  std::cout << " received signal notification from slave!"<< std::endl;
1914 
1915  //check if shutdown time
1916  bool running = fsm_.stateName()->toString()=="Enabled";
1917  bool stopping = fsm_.stateName()->toString()=="stopping";
1918  bool enabling = fsm_.stateName()->toString()=="enabling";
1919  if (!running && !enabling) {
1920  signalMonitorActive_ = false;
1921  return false;
1922  }
1923 
1924  crashesThisRun_++;
1925  gettimeofday(&lastCrashTime_,0);
1926 
1927  //set core size limit to 0 in master and slaves
1928  if (crashesThisRun_>=crashesToDump_.value_ && (running || stopping) && !rlimit_coresize_changed_) {
1929 
1930  rlimit rlold;
1931  getrlimit(RLIMIT_CORE,&rlold);
1932  rlimit rlnew = rlold;
1933  rlnew.rlim_cur=0;
1934  setrlimit(RLIMIT_CORE,&rlnew);
1936  MsgBuf master_message_rli_(NUMERIC_MESSAGE_SIZE,MSQM_MESSAGE_TYPE_RLI);
1937  //in case of frequent crashes, allow first slot to dump (until restart)
1938  unsigned int min=1;
1939  for (unsigned int i = min; i < subs_.size(); i++) {
1940  try {
1941  if (subs_[i].alive()) {
1942  subs_[i].post(master_message_rli_,false);
1943  }
1944  }
1945  catch (...) {}
1946  }
1947  std::ostringstream ostr;
1948  ostr << "Number of recent slave crashes reaches " << crashesThisRun_
1949  << ". Disabling core dumps for next 15 minutes in this FilterUnit";
1950  LOG4CPLUS_WARN(getApplicationLogger(),ostr.str());
1951  }
1952  }//end while loop
1953  signalMonitorActive_ = false;
1954  return false;
1955 }
1956 
1957 
1959 {
1960  try {
1961  if(hasShMem_) attachDqmToShm();
1962  int sc = 0;
1964  if(isRunNumberSetter_)
1966  else
1968 
1969  try{
1970  ::sleep(1);
1972  sc = evtProcessor_->statusAsync();
1973  }
1974  catch(cms::Exception &e) {
1978  return false;
1979  }
1980  catch(std::exception &e) {
1981  reasonForFailedState_ = e.what();
1984  return false;
1985  }
1986  catch(...) {
1987  reasonForFailedState_ = "Unknown Exception";
1990  return false;
1991  }
1992  if(sc != 0) {
1993  std::ostringstream oss;
1994  oss<<"EventProcessor::runAsync returned status code " << sc;
1995  reasonForFailedState_ = oss.str();
1998  return false;
1999  }
2000  }
2001  catch (xcept::Exception &e) {
2002  reasonForFailedState_ = "enabling FAILED: " + (std::string)e.what();
2005  return false;
2006  }
2007  try{
2008  fsm_.fireEvent("EnableDone",this);
2009  }
2010  catch (xcept::Exception &e) {
2011  std::cout << "exception " << (std::string)e.what() << std::endl;
2012  throw;
2013  }
2014 
2015  return false;
2016 }
2017 
2019  ((FUEventProcessor*)addr)->forkProcessesFromEDM();
2020 }
2021 
2023 
2024  moduleweb::ForkParams * forkParams = &(forkInfoObj_->forkParams);
2025  unsigned int forkFrom=0;
2026  unsigned int forkTo=nbSubProcesses_.value_;
2027  if (forkParams->slotId>=0) {
2028  forkFrom=forkParams->slotId;
2029  forkTo=forkParams->slotId+1;
2030  }
2031 
2032  //before fork, make sure to disconnect output modules from Shm
2033  try {
2034  if (sorRef_) {
2035  std::vector<edm::FUShmOutputModule *> shmOutputs = sorRef_->getShmOutputModules();
2036  for (unsigned int i=0;i<shmOutputs.size();i++) {
2037  //unregister PID from ShmBuffer/RB
2038  shmOutputs[i]->unregisterFromShm();
2039  //disconnect from Shm
2040  shmOutputs[i]->stop();
2041  }
2042  }
2043  }
2044  catch (std::exception &e)
2045  {
2046  reasonForFailedState_ = (std::string)"Thrown exception while disconnecting ShmOutputModule from Shm: " + e.what();
2047  LOG4CPLUS_ERROR(getApplicationLogger(),reasonForFailedState_);
2050  }
2051  catch (...) {
2052  reasonForFailedState_ = "Thrown unknown exception while disconnecting ShmOutputModule from Shm: ";
2053  LOG4CPLUS_ERROR(getApplicationLogger(),reasonForFailedState_);
2056  }
2057 
2058  std::string currentState = fsm_.stateName()->toString();
2059 
2060  //destroy MessageServicePresence thread before fork
2061  if (currentState!="stopping") {
2062  try {
2063  messageServicePresence_.reset();
2064  }
2065  catch (...) {
2066  LOG4CPLUS_ERROR(getApplicationLogger(),"Unable to destroy MessageService thread before fork!");
2067  }
2068  }
2069 
2070  if (currentState=="stopping") {
2071  LOG4CPLUS_ERROR(getApplicationLogger(),"Can not fork subprocesses in state " << fsm_.stateName()->toString());
2072  forkParams->isMaster=1;
2075  }
2076  //fork loop
2077  else for(unsigned int i=forkFrom; i<forkTo; i++)
2078  {
2079  int retval = subs_[i].forkNew();
2080  if(retval==0)
2081  {
2082  forkParams->isMaster=0;
2083  myProcess_ = &subs_[i];
2084  // dirty hack: delete/recreate global binary semaphore for later use in child
2086  toolbox::mem::_s_mutex_ptr_ = new toolbox::BSem(toolbox::BSem::FULL,true);
2087  int retval = pthread_mutex_destroy(&stop_lock_);
2088  if(retval != 0) perror("error");
2089  retval = pthread_mutex_init(&stop_lock_,0);
2090  if(retval != 0) perror("error");
2092 
2093  //recreate MessageLogger thread in slave after fork
2094  try{
2096  if(pf != 0) {
2097  messageServicePresence_ = pf->makePresence("MessageServicePresence");
2098  }
2099  else {
2100  LOG4CPLUS_ERROR(getApplicationLogger(),
2101  "SLAVE: Unable to create message service presence. pid:"<<getpid());
2102  }
2103  }
2104  catch(...) {
2105  LOG4CPLUS_ERROR(getApplicationLogger(),"SLAVE: Unknown Exception in MessageServicePresence. pid:"<<getpid());
2106  }
2107 
2109 
2110  //reconnect to Shm from output modules
2111  try {
2112  if (sorRef_) {
2113  std::vector<edm::FUShmOutputModule *> shmOutputs = sorRef_->getShmOutputModules();
2114  for (unsigned int i=0;i<shmOutputs.size();i++)
2115  shmOutputs[i]->start();
2116  }
2117  }
2118  catch (...)
2119  {
2120  LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception (ShmOutputModule sending InitMsg (pid:"<<getpid() <<")");
2121  }
2122 
2123  if (forkParams->restart) {
2124  //do restart things
2125  scalersUpdates_ = 0;
2126  try {
2127  fsm_.fireEvent("Stop",this); // need to set state in fsm first to allow stopDone transition
2128  fsm_.fireEvent("StopDone",this); // need to set state in fsm first to allow stopDone transition
2129  fsm_.fireEvent("Enable",this); // need to set state in fsm first to allow stopDone transition
2130  } catch (...) {
2131  LOG4CPLUS_WARN(getApplicationLogger(),"Failed to Stop/Enable FSM of the restarted slave EP");
2132  }
2133  try{
2134  xdata::Serializable *lsid = applicationInfoSpace_->find("lumiSectionIndex");
2135  if(lsid) {
2136  ((xdata::UnsignedInteger32*)(lsid))->value_--; // need to reset to value before end of ls in which process died
2137  }
2138  }
2139  catch(...){
2140  std::cout << "trouble with lsindex during restart" << std::endl;
2141  }
2142  try{
2143  xdata::Serializable *lstb = applicationInfoSpace_->find("lsToBeRecovered");
2144  if(lstb) {
2145  ((xdata::Boolean*)(lstb))->value_ = false; // do not issue eol/bol for all Ls when restarting
2146  }
2147  }
2148  catch(...){
2149  std::cout << "trouble with resetting flag for eol recovery " << std::endl;
2150  }
2153  }
2154 
2155  //start other threads
2159  while(!evtProcessor_.isWaitingForLs())
2160  ::usleep(100000);//wait for scalers loop to start
2161 
2162  //connect DQMShmOutputModule
2163  if(hasShMem_) attachDqmToShm();
2164 
2165  //catch transition error if we are already Enabled
2166  try {
2167  fsm_.fireEvent("EnableDone",this);
2168  }
2169  catch (...) {}
2170 
2171  //make sure workloops are started
2172  while (!wlReceiving_->isActive() || !wlReceivingMonitor_->isActive()) usleep(10000);
2173 
2174  //unmask signals
2175  sigset_t tmpset_thread;
2176  sigemptyset(&tmpset_thread);
2177  sigaddset(&tmpset_thread, SIGQUIT);
2178  sigaddset(&tmpset_thread, SIGILL);
2179  sigaddset(&tmpset_thread, SIGABRT);
2180  sigaddset(&tmpset_thread, SIGFPE);
2181  sigaddset(&tmpset_thread, SIGSEGV);
2182  sigaddset(&tmpset_thread, SIGALRM);
2183  //sigprocmask(SIG_UNBLOCK, &tmpset_thread, 0);
2184  pthread_sigmask(SIG_UNBLOCK,&tmpset_thread,0);
2185 
2186  //set signal handlers
2187  struct sigaction sa;
2188  sigset_t tmpset;
2189  memset(&tmpset,0,sizeof(tmpset));
2190  sigemptyset(&tmpset);
2191  sa.sa_mask=tmpset;
2192  sa.sa_flags=SA_RESETHAND | SA_SIGINFO;
2193  sa.sa_handler=0;
2194  sa.sa_sigaction=evfep_sighandler;
2195 
2196  sigaction(SIGQUIT,&sa,0);
2197  sigaction(SIGILL,&sa,0);
2198  sigaction(SIGABRT,&sa,0);
2199  sigaction(SIGFPE,&sa,0);
2200  sigaction(SIGSEGV,&sa,0);
2201  sa.sa_sigaction=evfep_alarmhandler;
2202  sigaction(SIGALRM,&sa,0);
2203 
2204  //child return to DaqSource
2205  return ;
2206  }
2207  else {
2208 
2209  forkParams->isMaster=1;
2211  if (forkParams->restart) {
2212  std::ostringstream ost1;
2213  ost1 << "-I- New Process " << retval << " forked for slot " << forkParams->slotId;
2214  localLog(ost1.str());
2215  }
2217  //start "crash" receiver workloop
2218  }
2219  }
2220 
2221  //recreate MessageLogger thread after fork
2222  try{
2223  //release the presense factory in master
2225  if(pf != 0) {
2226  messageServicePresence_ = pf->makePresence("MessageServicePresence");
2227  }
2228  else {
2229  LOG4CPLUS_ERROR(getApplicationLogger(),
2230  "Unable to recreate message service presence ");
2231  }
2232  }
2233  catch(...) {
2234  LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception in MessageServicePresence");
2235  }
2236  restart_in_progress_=false;
2237  edm_init_done_=true;
2238 }
2239 
2241 {
2243  try {
2244  //set to connect to Shm later
2245  //if(hasShMem_) setAttachDqmToShm();
2246 
2247  int sc = 0;
2248  //maybe not needed in MP mode
2250  if(isRunNumberSetter_)
2252  else
2254 
2255  //prepare object used to communicate with DaqSource
2256  pthread_mutex_destroy(&forkObjLock_);
2257  pthread_mutex_init(&forkObjLock_,0);
2258  if (forkInfoObj_) delete forkInfoObj_;
2261  forkInfoObj_->fuAddr=(void*)this;
2267  if (mwrRef_)
2269 
2271  sc = evtProcessor_->statusAsync();
2272 
2273  if(sc != 0) {
2274  std::ostringstream oss;
2275  oss<<"EventProcessor::runAsync returned status code " << sc;
2276  reasonForFailedState_ = oss.str();
2278  LOG4CPLUS_FATAL(log_,reasonForFailedState_);
2279  return false;
2280  }
2281  }
2282  //catch exceptions on master side
2283  catch(cms::Exception &e) {
2286  LOG4CPLUS_FATAL(log_,reasonForFailedState_);
2287  return false;
2288  }
2289  catch(std::exception &e) {
2290  reasonForFailedState_ = e.what();
2292  LOG4CPLUS_FATAL(log_,reasonForFailedState_);
2293  return false;
2294  }
2295  catch(...) {
2296  reasonForFailedState_ = "Unknown Exception";
2298  LOG4CPLUS_FATAL(log_,reasonForFailedState_);
2299  return false;
2300  }
2301  return true;
2302 }
2303 
2304 bool FUEventProcessor::restartForkInEDM(unsigned int slotId) {
2305  //daqsource will keep this lock until master returns after fork
2306  //so that we don't do another EP restart in between
2307  forkInfoObj_->lock();
2308  forkInfoObj_->forkParams.slotId=slotId;
2312  LOG4CPLUS_DEBUG(log_, " restarting subprocess in slot "<< slotId <<": posting on control semaphore");
2313  sem_post(forkInfoObj_->control_sem_);
2314  forkInfoObj_->unlock();
2315  usleep(1000);
2316  //sleep until fork is performed
2317  int count=50;
2318  restart_in_progress_=true;
2319  while (restart_in_progress_ && count--) usleep(20000);
2320  return true;
2321 }
2322 
2324 {
2325  bool retval = enableCommon();
2327  LOG4CPLUS_INFO(getApplicationLogger(),"waiting for edm::EventProcessor to start before enabling watchdog");
2328  ::sleep(1);
2329  }
2330 
2331  // implementation moved to EPWrapper
2332  // startScalersWorkLoop(); // this is now not done any longer
2333  localLog("-I- Start completed");
2334  return retval;
2335 }
2337 {
2338  //all this happens only in the child process
2339 
2344  while(!evtProcessor_.isWaitingForLs())
2345  ::sleep(1);
2346 
2347  // @EM test do not run monitor loop in slave, only receiving&Monitor
2348  // evtProcessor_.startMonitoringWorkLoop();
2349  try{
2350  // evtProcessor_.makeServicesOnly();
2351  try{
2353  if(pf != 0) {
2354  pf->makePresence("MessageServicePresence").release();
2355  }
2356  else {
2357  LOG4CPLUS_ERROR(getApplicationLogger(),
2358  "Unable to create message service presence ");
2359  }
2360  }
2361  catch(...) {
2362  LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception");
2363  }
2365  }
2366  catch (xcept::Exception &e) {
2367  reasonForFailedState_ = "enabling FAILED: " + (std::string)e.what();
2370  }
2371  bool retval = enableCommon();
2372  // while(evtProcessor_->getState()!= edm::event_processor::sRunning){
2373  // LOG4CPLUS_INFO(getApplicationLogger(),"waiting for edm::EventProcessor to start before enabling watchdog");
2374  // ::sleep(1);
2375  // }
2376  return retval;
2377 }
2378 
2380 {
2381  bool failed=false;
2382  try {
2383  LOG4CPLUS_INFO(getApplicationLogger(),"Start stopping :) ...");
2386  fsm_.fireEvent("StopDone",this);
2387  else
2388  {
2389  failed=true;
2390  // epMState_ = evtProcessor_->currentStateName();
2392  reasonForFailedState_ = "EventProcessor stop timed out";
2393  else
2394  reasonForFailedState_ = "EventProcessor did not receive STOP event";
2395  }
2396  }
2397  catch (xcept::Exception &e) {
2398  failed=true;
2399  reasonForFailedState_ = "Stopping FAILED: " + (std::string)e.what();
2400  }
2401  catch (edm::Exception &e) {
2402  failed=true;
2403  reasonForFailedState_ = "Stopping FAILED: " + (std::string)e.what();
2404  }
2405  catch (...) {
2406  failed=true;
2407  reasonForFailedState_= "Stopping FAILED: unknown exception";
2408  }
2409  try {
2410  if (hasShMem_) {
2411  detachDqmFromShm();
2412  if (failed)
2413  LOG4CPLUS_WARN(getApplicationLogger(),
2414  "In failed STOP - success detaching DQM from Shm. pid:" << getpid());
2415  }
2416  }
2417  catch (cms::Exception & e) {
2418  failed=true;
2419  reasonForFailedState_= "Stopping FAILED: " + (std::string)e.what();
2420  }
2421  catch (...) {
2422  failed=true;
2423  reasonForFailedState_= "DQM detach failed: Unknown exception";
2424  }
2425 
2426  if (failed) {
2427  LOG4CPLUS_FATAL(getApplicationLogger(),"STOP failed: "
2428  << reasonForFailedState_ << " (pid:" << getpid()<<")");
2431  }
2432 
2433  LOG4CPLUS_INFO(getApplicationLogger(),"Finished stopping!");
2434  localLog("-I- Stop completed");
2435  return false;
2436 }
2437 
2439 {
2442 
2443  std::vector<bool> processes_to_stop(nbSubProcesses_.value_,false);
2444  for(unsigned int i = 0; i < subs_.size(); i++)
2445  {
2446  pthread_mutex_lock(&stop_lock_);
2447  if(subs_[i].alive()>0){
2448  processes_to_stop[i] = true;
2449  subs_[i].post(msg,false);
2450  }
2451  pthread_mutex_unlock(&stop_lock_);
2452  }
2453  for(unsigned int i = 0; i < subs_.size(); i++)
2454  {
2455  pthread_mutex_lock(&stop_lock_);
2456  if(processes_to_stop[i]){
2457  try{
2458  subs_[i].rcv(msg1,false);
2459  }
2460  catch(evf::Exception &e){
2461  std::ostringstream ost;
2462  ost << "failed to get STOP - errno ->" << errno << " " << e.what();
2463  reasonForFailedState_ = ost.str();
2464  LOG4CPLUS_ERROR(getApplicationLogger(),reasonForFailedState_);
2465  // fsm_.fireFailed(reasonForFailedState_,this);
2467  pthread_mutex_unlock(&stop_lock_);
2468  continue;
2469  }
2470  }
2471  else {
2472  pthread_mutex_unlock(&stop_lock_);
2473  continue;
2474  }
2475  pthread_mutex_unlock(&stop_lock_);
2476  if(msg1->mtype==MSQS_MESSAGE_TYPE_STOP)
2477  while(subs_[i].alive()>0) ::usleep(10000);
2478  subs_[i].disconnect();
2479  }
2480  // subs_.clear();
2481 
2482 }
2483 
2485 {
2486  std::string urn = getApplicationDescriptor()->getURN();
2487  try{
2490  if(myProcess_) {std::cout << "microstate called for child! bail out" << std::endl; return;}
2491  *out << "<tr><td>" << fsm_.stateName()->toString()
2492  << "</td><td>"<< (myProcess_ ? "S" : "M") <<"</td><td>" << nblive_ << "</td><td>"
2493  << nbdead_ << "</td><td><a href=\"/" << urn << "/procStat\">" << getpid() <<"</a></td>";
2495  *out << "<td></td><td>" << nbTotalDQM_
2496  << "</td><td>" << evtProcessor_.getScalersUpdates() << "</td></tr>";
2497  if(nbSubProcesses_.value_!=0 && !myProcess_)
2498  {
2499  pthread_mutex_lock(&start_lock_);
2500  for(unsigned int i = 0; i < subs_.size(); i++)
2501  {
2502  try{
2503  if(subs_[i].alive()>0)
2504  {
2505  *out << "<tr><td bgcolor=\"#00FF00\" id=\"a"
2506  << i << "\">""Alive</td><td>S</td><td>"
2507  << subs_[i].queueId() << "<td>"
2508  << subs_[i].queueStatus()<< "/"
2509  << subs_[i].queueOccupancy() << "/"
2510  << subs_[i].queuePidOfLastSend() << "/"
2511  << subs_[i].queuePidOfLastReceive()
2512  << "</td><td><a id=\"p"<< i << "\" href=\"SubWeb?process="
2513  << subs_[i].pid() << "&method=procStat\">"
2514  << subs_[i].pid()<<"</a></td>" //<< msg->mtext;
2515  << "<td>" << evtProcessor_.stateNameFromIndex(subs_[i].params().Ms) << "</td><td>"
2516  << evtProcessor_.moduleNameFromIndex(subs_[i].params().ms) << "</td><td>"
2517  << subs_[i].params().nba << "/" << subs_[i].params().nbp
2518  << " (" << float(subs_[i].params().nba)/float(subs_[i].params().nbp)*100. <<"%)"
2519  << "</td><td>" << subs_[i].params().ls << "/" << subs_[i].params().ls
2520  << "</td><td>" << subs_[i].params().ps
2521  << "</td><td"
2522  << ((subs_[i].params().eols<subs_[i].params().ls) ? " bgcolor=\"#00FF00\"" : " bgcolor=\"#FF0000\"") << ">"
2523  << subs_[i].params().eols
2524  << "</td><td>" << subs_[i].params().dqm
2525  << "</td><td>" << subs_[i].params().trp << "</td>";
2526  }
2527  else
2528  {
2529  pthread_mutex_lock(&pickup_lock_);
2530  *out << "<tr><td id=\"a"<< i << "\" ";
2531  if(subs_[i].alive()==-1000)
2532  *out << " bgcolor=\"#bbaabb\">NotInitialized";
2533  else
2534  *out << (subs_[i].alive()==0 ? ">Done" : " bgcolor=\"#FF0000\">Dead");
2535  *out << "</td><td>S</td><td>"<< subs_[i].queueId() << "<td>"
2536  << subs_[i].queueStatus() << "/"
2537  << subs_[i].queueOccupancy() << "/"
2538  << subs_[i].queuePidOfLastSend() << "/"
2539  << subs_[i].queuePidOfLastReceive()
2540  << "</td><td id=\"p"<< i << "\">"
2541  <<subs_[i].pid()<<"</td><td colspan=\"5\">" << subs_[i].reasonForFailed();
2542  if(subs_[i].alive()!=0 && subs_[i].alive()!=-1000)
2543  {
2544  if(autoRestartSlaves_ && subs_[i].restartCount()<=2)
2545  *out << " will restart in " << subs_[i].countdown() << " s";
2546  else if(autoRestartSlaves_)
2547  *out << " reached maximum restart count";
2548  else *out << " autoRestart is disabled ";
2549  }
2550  *out << "</td><td"
2551  << ((subs_[i].params().eols<subs_[i].params().ls) ?
2552  " bgcolor=\"#00FF00\"" : " bgcolor=\"#FF0000\"")
2553  << ">"
2554  << subs_[i].params().eols
2555  << "</td><td>" << subs_[i].params().dqm
2556  << "</td><td>" << subs_[i].params().trp << "</td>";
2557  pthread_mutex_unlock(&pickup_lock_);
2558  }
2559  *out << "</tr>";
2560  }
2561  catch(evf::Exception &e){
2562  *out << "<tr><td id=\"a"<< i << "\" "
2563  <<"bgcolor=\"#FFFF00\">NotResp</td><td>S</td><td>"<< subs_[i].queueId() << "<td>"
2564  << subs_[i].queueStatus() << "/"
2565  << subs_[i].queueOccupancy() << "/"
2566  << subs_[i].queuePidOfLastSend() << "/"
2567  << subs_[i].queuePidOfLastReceive()
2568  << "</td><td id=\"p"<< i << "\">"
2569  <<subs_[i].pid()<<"</td>";
2570  }
2571  }
2572  pthread_mutex_unlock(&start_lock_);
2573  }
2574  }
2575  catch(evf::Exception &e)
2576  {
2577  LOG4CPLUS_INFO(getApplicationLogger(),"evf::Exception caught in microstate - " << e.what());
2578  }
2579  catch(cms::Exception &e)
2580  {
2581  LOG4CPLUS_INFO(getApplicationLogger(),"cms::Exception caught in microstate - " << e.what());
2582  }
2583  catch(std::exception &e)
2584  {
2585  LOG4CPLUS_INFO(getApplicationLogger(),"std::Exception caught in microstate - " << e.what());
2586  }
2587  catch(...)
2588  {
2589  LOG4CPLUS_INFO(getApplicationLogger(),"unknown exception caught in microstate - ");
2590  }
2591 
2592 }
2593 
2594 
2596 {
2597  using namespace utils;
2598 
2599  *out << updaterStatic_;
2600  mDiv(out,"loads");
2601  uptime(out);
2602  cDiv(out);
2603  mDiv(out,"st",fsm_.stateName()->toString());
2604  mDiv(out,"ru",runNumber_.toString());
2605  mDiv(out,"nsl",nbSubProcesses_.value_);
2606  mDiv(out,"nsr",nbSubProcessesReporting_.value_);
2607  mDiv(out,"cl");
2608  *out << getApplicationDescriptor()->getClassName()
2609  << (nbSubProcesses_.value_ > 0 ? "MP " : " ");
2610  cDiv(out);
2611  mDiv(out,"in",getApplicationDescriptor()->getInstance());
2612  if(fsm_.stateName()->toString() != "Halted" && fsm_.stateName()->toString() != "halting"){
2613  mDiv(out,"hlt");
2614  *out << "<a href=\"" << configString_.toString() << "\">HLT Config</a>";
2615  cDiv(out);
2616  *out << std::endl;
2617  }
2618  else
2619  mDiv(out,"hlt","Not yet...");
2620 
2621  mDiv(out,"sq",squidPresent_.toString());
2622  mDiv(out,"vwl",(supervising_ ? "Active" : "Not Initialized"));
2624  if(nbProcessed != 0 && nbAccepted != 0)
2625  {
2626  mDiv(out,"tt",((xdata::UnsignedInteger32*)nbProcessed)->value_);
2627  mDiv(out,"ac",((xdata::UnsignedInteger32*)nbAccepted)->value_);
2628  }
2629  else
2630  {
2631  mDiv(out,"tt",0);
2632  mDiv(out,"ac",0);
2633  }
2634  if(!myProcess_)
2635  mDiv(out,"swl",(wlSummarizeActive_ ? "Active" : "Inactive"));
2636  else
2637  mDiv(out,"swl",(wlScalersActive_ ? "Active" : "Inactive"));
2638 
2639  mDiv(out,"idi",iDieUrl_.value_);
2640  if(vp_!=0){
2641  mDiv(out,"vpi",(unsigned int) vp_);
2642  if(vulture_->hasStarted()>=0)
2643  mDiv(out,"vul","Prowling");
2644  else
2645  mDiv(out,"vul","Dead");
2646  }
2647  else{
2648  mDiv(out,"vul",(vulture_==0 ? "Nope" : "Hatching"));
2649  }
2650  if(evtProcessor_){
2651  mDiv(out,"ll");
2653  << "," << evtProcessor_.lastLumi().proc << "," << evtProcessor_.lastLumi().acc;
2654  cDiv(out);
2655  }
2656  mDiv(out,"lg");
2657  for(unsigned int i = logRingIndex_; i<logRingSize_; i++)
2658  *out << logRing_[i] << std::endl;
2659  if(logWrap_)
2660  for(unsigned int i = 0; i<logRingIndex_; i++)
2661  *out << logRing_[i] << std::endl;
2662  cDiv(out);
2663  mDiv(out,"cha");
2664  if(cpustat_) *out << cpustat_->getChart();
2665  cDiv(out);
2666 }
2667 
2669 {
2671 }
2672 
2674 {
2675  if(myProcess_) myProcess_->postSlave(buf,true);
2676 }
2677 
2679 {
2680  using namespace utils;
2681  std::ostringstream ost;
2682  mDiv(&ost,"ve");
2683  ost<< "$Revision: 1.164 $ (" << edm::getReleaseVersion() <<")";
2684  cDiv(&ost);
2685  mDiv(&ost,"ou",outPut_.toString());
2686  mDiv(&ost,"sh",hasShMem_.toString());
2687  mDiv(&ost,"mw",hasModuleWebRegistry_.toString());
2688  mDiv(&ost,"sw",hasServiceWebRegistry_.toString());
2689 
2690  xdata::Serializable *monsleep = 0;
2691  xdata::Serializable *lstimeout = 0;
2692  try{
2693  monsleep = applicationInfoSpace_->find("monSleepSec");
2694  lstimeout = applicationInfoSpace_->find("lsTimeOut");
2695  }
2697  }
2698 
2699  if(monsleep!=0)
2700  mDiv(&ost,"ms",monsleep->toString());
2701  if(lstimeout!=0)
2702  mDiv(&ost,"lst",lstimeout->toString());
2703  char cbuf[sizeof(struct utsname)];
2704  struct utsname* buf = (struct utsname*)cbuf;
2705  uname(buf);
2706  mDiv(&ost,"sysinfo");
2707  ost << buf->sysname << " " << buf->nodename
2708  << " " << buf->release << " " << buf->version << " " << buf->machine;
2709  cDiv(&ost);
2710  updaterStatic_ = ost.str();
2711 }
2712 
2713 void FUEventProcessor::handleSignalSlave(int sig, siginfo_t* info, void* c)
2714 {
2715  //notify master
2716  sem_post(sigmon_sem_);
2717 
2718  //sleep while master takes action
2719  sleep(2);
2720 
2721  //set up alarm if handler deadlocks on unsafe actions
2722  alarm(5);
2723 
2724  std::cout << "--- Slave EP signal handler caught signal " << sig << " process id is " << info->si_pid <<" ---" << std::endl;
2725  std::cout << "--- Address: " << std::hex << info->si_addr << std::dec << " --- " << std::endl;
2726  std::cout << "--- Stacktrace follows --" << std::endl;
2727  std::ostringstream stacktr;
2728  toolbox::stacktrace(20,stacktr);
2729  std::cout << stacktr.str();
2731  std::cout << "--- Dumping core." << " --- " << std::endl;
2732  else
2733  std::cout << "--- Core dump count exceeded on this FU. ---"<<std::endl;
2734 
2735  std::string hasdump = "";
2736  if (rlimit_coresize_changed_) hasdump = " (core dump disabled) ";
2737 
2738  LOG4CPLUS_ERROR(getApplicationLogger(), "--- Slave EP signal handler caught signal " << sig << ". process id is " << getpid()
2739  << " on node " << toolbox::net::getHostName() << " ---" << std::endl
2740  << "--- Address: " << std::hex << info->si_addr << std::dec << " --- " << std::endl
2741  << "--- Stacktrace follows"<< hasdump << " ---" << std::endl << stacktr.str()
2742  );
2743 
2744  //re-raise signal with default handler (will cause core dump if enabled)
2745  raise(sig);
2746 }
2747 
2748 
2749 XDAQ_INSTANTIATOR_IMPL(evf::FUEventProcessor)
xdata::Boolean hasModuleWebRegistry_
static const char runNumber_[]
virtual char const * what() const
Definition: Exception.cc:141
dbl * delta
Definition: mlp_gen.cc:36
void summaryWebPage(xgi::Input *, xgi::Output *, const std::string &)
Definition: FWEPWrapper.cc:815
unsigned int ps
Definition: queue_defs.h:60
int stop()
Definition: Vulture.cc:254
unsigned long rcvSlave(MsgBuf &ptr, bool isMonitor)
Definition: SubProcess.h:116
ShmOutputModuleRegistry * sorRef_
void spotlightWebPage(xgi::Input *, xgi::Output *)
bool check()
Definition: SquidNet.cc:21
xdata::Vector< xdata::Integer > spMStates_
int i
Definition: DBlmapReader.cc:9
xoap::MessageReference commandCallback(xoap::MessageReference msg)
Definition: StateMachine.cc:71
xdata::InfoSpace * monitorInfoSpace_
unsigned int eols
Definition: queue_defs.h:59
void subWeb(xgi::Input *in, xgi::Output *out)
pid_t makeProcess()
Definition: Vulture.cc:154
tuple start
Check for commandline option errors.
Definition: dqm_diff.py:58
#define Input(cl)
Definition: vmac.h:189
event_processor::State getState() const
unsigned int nbp
Definition: queue_defs.h:61
std::vector< std::string > logRing_
bool receivingAndMonitor(toolbox::task::WorkLoop *wl)
xdata::UnsignedInteger32 slaveRestartDelaySecs_
xdata::Boolean datasetCounting_
void updateRollingReport()
xdata::UnsignedInteger32 runNumber_
#define MSGQ_MESSAGE_TYPE_RANGE
Definition: queue_defs.h:13
void updater(xgi::Input *in, xgi::Output *out)
toolbox::task::WorkLoop * wlScalers_
pthread_mutex_t start_lock_
xdata::Integer epmAltState_
Definition: FWEPWrapper.h:197
std::map< std::string, std::string, std::less< std::string > > & getEnvironment()
#define MSQS_MESSAGE_TYPE_TRR
Definition: queue_defs.h:34
void startMonitoringWorkLoop()
Definition: FWEPWrapper.cc:614
virtual std::string explainSelf() const
Definition: Exception.cc:146
void publishForkInfo(std::string name, moduleweb::ForkInfoObj *forkInfoObj)
xdata::UnsignedInteger32 crashesToDump_
void taskWebPage(xgi::Input *, xgi::Output *, const std::string &)
Definition: FWEPWrapper.cc:881
tuple pieces
Definition: csv2json.py:31
bool restartForkInEDM(unsigned int slotId)
xdata::Boolean isRunNumberSetter_
void setAppCtxt(xdaq::ApplicationContext *ctx)
Definition: FWEPWrapper.h:91
ShmOutputModuleRegistry * getShmOutputModuleRegistry()
Definition: FWEPWrapper.cc:491
unsigned int acc
Definition: FWEPWrapper.h:45
xdata::UnsignedInteger32 forkInEDM_
void sendAuxLegenda(const std::string &)
Definition: RateStat.cc:29
xdata::Boolean hasServiceWebRegistry_
void clearCounters()
Clears counters used by trigger report.
toolbox::task::WorkLoop * wlReceiving_
void setCPUStat(int busyPer1k)
Definition: CPUStat.h:25
#define nullptr
boost::tokenizer< boost::char_separator< char > > tokenizer
void init(unsigned short, std::string &)
Definition: FWEPWrapper.cc:193
xdata::Boolean hasShMem_
MsgBuf & getPackedTriggerReport()
Definition: FWEPWrapper.h:114
#define MSQM_MESSAGE_TYPE_FSTOP
Definition: queue_defs.h:17
def pipe
Definition: pipe.py:5
toolbox::task::ActionSignature * asReceiveMsgAndExecute_
#define NULL
Definition: scimark2.h:8
toolbox::task::ActionSignature * asSignalMonitor_
FUEventProcessor * FUInstancePtr_
void(* forkHandler)(void *)
Definition: ModuleWeb.h:44
#define min(a, b)
Definition: mlp_lapack.h:161
unsigned int ls
Definition: queue_defs.h:58
void procStat(std::ostringstream *out)
Definition: procUtils.cc:176
void setupFastTimerService(unsigned int nProcesses)
Definition: FWEPWrapper.cc:497
xdata::InfoSpace * scalersLegendaInfoSpace_
toolbox::task::ActionSignature * asSupervisor_
void setRunNumber(RunNumber_t runNumber)
xdata::Boolean autoRestartSlaves_
tuple els
Definition: asciidump.py:420
std::string const & moduleNameFromIndex(unsigned int i) const
Definition: FWEPWrapper.h:122
xdata::UnsignedInteger32 nbSubProcessesReporting_
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:7
void forceInitEventProcessorMaybe()
Definition: FWEPWrapper.h:68
unsigned int crashesThisRun_
void css(xgi::Input *in, xgi::Output *out)
#define MAX_MSG_SIZE
Definition: queue_defs.h:10
void cDiv(std::ostringstream *out)
Definition: procUtils.cc:357
lsTriplet & lastLumi()
Definition: FWEPWrapper.h:133
void sleep(Duration_t)
Definition: Utils.h:163
#define MSQM_MESSAGE_TYPE_RLR
Definition: queue_defs.h:25
toolbox::task::ActionSignature * asReceiveMsgAndRead_
void setElapsed(int mseconds)
Definition: CPUStat.h:28
dictionary map
Definition: Association.py:205
xdata::Boolean exitOnError_
void evfep_sighandler(int sig, siginfo_t *info, void *c)
std::vector< SubProcess > subs_
xdata::InfoSpace * monitorLegendaInfoSpace_
void initialize(T *app)
Definition: StateMachine.h:84
bool summarize(toolbox::task::WorkLoop *wl)
void getSlavePids(xgi::Input *in, xgi::Output *out)
xdata::Serializable * nbProcessed
xoap::MessageReference fsmCallback(xoap::MessageReference msg)
pthread_mutex_t forkObjLock_
U second(std::pair< T, U > const &p)
std::string wlMonitoring()
Definition: FWEPWrapper.h:98
#define MSQM_MESSAGE_TYPE_PRG
Definition: queue_defs.h:19
xdata::Bag< xdaq2rc::ClassnameAndInstance > * rcmsStateListener()
Definition: StateMachine.h:72
void fireFailed(const std::string &errorMsg, void *originator)
#define MSQS_MESSAGE_TYPE_WEB
Definition: queue_defs.h:33
void adjustLsIndexForRestart()
Definition: FWEPWrapper.h:112
bool halting(toolbox::task::WorkLoop *wl)
#define MSQS_MESSAGE_TYPE_STOP
Definition: queue_defs.h:31
#define MSQS_MESSAGE_TYPE_PRR
Definition: queue_defs.h:32
bool fireScalersUpdate()
Definition: FWEPWrapper.cc:777
evf::StateMachine fsm_
bool scalers(toolbox::task::WorkLoop *wl)
unsigned int stopCondition
Definition: ModuleWeb.h:46
#define MSQM_MESSAGE_TYPE_RLI
Definition: queue_defs.h:24
int cycle
ModuleWebRegistry * getModuleWebRegistry()
Definition: FWEPWrapper.cc:485
unsigned int lsid_
Definition: FWEPWrapper.h:210
toolbox::task::WorkLoop * wlSupervising_
void declareRunNumber(RunNumber_t runNumber)
static void setAppl(xdaq::Application *app)
Definition: MLlog4cplus.cc:85
std::string reasonForFailedState_
void resetLumiSectionReferenceIndex()
Definition: FWEPWrapper.h:137
unsigned int proc
Definition: FWEPWrapper.h:44
bool monitoring(toolbox::task::WorkLoop *wl)
Definition: FWEPWrapper.cc:647
FUEventProcessor(xdaq::ApplicationStub *s)
int postSlave(MsgBuf &ptr, bool isMonitor)
Definition: SubProcess.h:110
#define PIPE_READ
Definition: queue_defs.h:40
unsigned int psid_
Definition: FWEPWrapper.h:211
unsigned long long idleProcStats_
bool isAvailable() const
Definition: Service.h:47
xdata::Boolean iDieStatisticsGathering_
int evfep_raised_signal
int j
Definition: DBlmapReader.cc:9
unsigned int Ms
Definition: queue_defs.h:63
int start(std::string, int=0)
Definition: Vulture.cc:241
void setAppDesc(xdaq::ApplicationDescriptor *ad)
Definition: FWEPWrapper.h:90
bool sigmon(toolbox::task::WorkLoop *wl)
void setNproc(int nproc)
Definition: CPUStat.h:22
uint16_t mem[nChs][nEvts]
void scalersWeb(xgi::Input *, xgi::Output *)
void resetPackedTriggerReport()
Definition: FWEPWrapper.h:111
xdata::Boolean squidPresent_
std::list< std::string > names_
void setScalersInfoSpace(xdata::InfoSpace *sis, xdata::InfoSpace *slis)
Definition: FWEPWrapper.h:77
void addEntry(int sta)
Definition: CPUStat.h:17
std::vector< edm::FUShmOutputModule * > getShmOutputModules()
void sendStat(unsigned int)
Definition: CPUStat.cc:33
xdata::Boolean hasPrescaleService_
#define NUMERIC_MESSAGE_SIZE
Definition: queue_defs.h:38
bool first
Definition: L1TdeRCT.cc:94
int totalEvents() const
unsigned int getNumberOfMicrostates()
Definition: FWEPWrapper.h:140
void handleSignalSlave(int sig, siginfo_t *info, void *c)
bool receiving(toolbox::task::WorkLoop *wl)
bool stopping(toolbox::task::WorkLoop *wl)
void reset()
Definition: CPUStat.h:31
void procStat(xgi::Input *in, xgi::Output *out)
moduleweb::ForkInfoObj * forkInfoObj_
toolbox::task::WorkLoop * wlSummarize_
unsigned int nba
Definition: queue_defs.h:62
char const * stateName(event_processor::State s) const
void fireEvent(const std::string &evtType, void *originator)
xdata::Serializable * nbAccepted
tuple out
Definition: dbtoconf.py:99
void disableRcmsStateNotification()
Definition: StateMachine.h:64
toolbox::BSem * _s_mutex_ptr_
std::string getReleaseVersion()
#define MSQM_MESSAGE_TYPE_TRP
Definition: queue_defs.h:21
void localLog(std::string)
static PresenceFactory * get()
#define MSQS_MESSAGE_TYPE_MCR
Definition: queue_defs.h:30
void setMonitorInfoSpace(xdata::InfoSpace *mis, xdata::InfoSpace *mlis)
Definition: FWEPWrapper.h:83
static void forkProcessFromEDM_helper(void *addr)
ModuleWebRegistry * mwrRef_
bool configuring(toolbox::task::WorkLoop *wl)
int notstarted_state_code() const
Definition: FWEPWrapper.h:132
xdata::InfoSpace * applicationInfoSpace_
edm::EventProcessor::StatusCode stop()
Definition: FWEPWrapper.cc:503
xdata::UnsignedInteger32 nbSubProcesses_
bool enabling(toolbox::task::WorkLoop *wl)
std::auto_ptr< Presence > makePresence(std::string const &presence_type) const
xdata::String * stateName()
Definition: StateMachine.h:69
StatusCode statusAsync() const
void setApplicationInfoSpace(xdata::InfoSpace *is)
Definition: FWEPWrapper.h:82
unsigned int getLumiSectionReferenceIndex()
Definition: FWEPWrapper.h:139
std::string & getChart()
Definition: CPUStat.h:43
bool getTriggerReport(bool useLock)
Definition: FWEPWrapper.cc:694
unsigned int scalersUpdates_
xdata::String configString_
xdata::Boolean * foundRcmsStateListener()
Definition: StateMachine.h:78
std::vector< std::string > const & getmicromap() const
Definition: FWEPWrapper.h:141
xdata::UnsignedInteger32 superSleepSec_
ServiceToken getToken()
unsigned long long allProcStats_
xdata::UnsignedInteger32 lastLumiUsingEol_
Definition: FWEPWrapper.h:222
std::auto_ptr< edm::Presence > messageServicePresence_
#define PIPE_WRITE
Definition: queue_defs.h:41
std::string const & stateNameFromIndex(unsigned int i) const
Definition: FWEPWrapper.h:127
void sendMessageOverMonitorQueue(MsgBuf &)
pthread_mutex_t * mst_lock_
Definition: ModuleWeb.h:49
bool isWaitingForLs()
Definition: FWEPWrapper.h:135
void moduleWeb(xgi::Input *in, xgi::Output *out)
std::string const & configuration() const
Definition: FWEPWrapper.h:102
#define MSQM_MESSAGE_TYPE_STOP
Definition: queue_defs.h:16
void pathNames(xgi::Input *, xgi::Output *)
dictionary args
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:82
#define Output(cl)
Definition: vmac.h:193
void procCpuStat(unsigned long long &idleJiffies, unsigned long long &allJiffies)
Definition: procUtils.cc:157
void sendLegenda(const std::vector< std::string > &)
Definition: CPUStat.cc:39
unsigned int ls
Definition: FWEPWrapper.h:43
pthread_mutex_t pickup_lock_
void defaultWebPage(xgi::Input *in, xgi::Output *out)
unsigned int trp
Definition: queue_defs.h:66
toolbox::task::ActionSignature * asSummarize_
toolbox::task::WorkLoop * wlSignalMonitor_
tuple query
Definition: o2o.py:269
xdata::Vector< xdata::Integer > spmStates_
int hasStarted()
Definition: Vulture.cc:215
void sendLegenda(const std::string &)
Definition: RateStat.cc:24
xdata::Integer epMAltState_
Definition: FWEPWrapper.h:196
tuple cout
Definition: gather_cfg.py:121
xdata::UnsignedInteger32 instance_
void serviceWeb(xgi::Input *in, xgi::Output *out)
#define MSQM_MESSAGE_TYPE_MCS
Definition: queue_defs.h:18
void mDiv(std::ostringstream *out, std::string name)
Definition: procUtils.cc:354
edm::EventSummary eventSummary
toolbox::task::WorkLoop * wlReceivingMonitor_
void withdrawLumiSectionIncrement()
Definition: FWEPWrapper.h:138
#define MSQM_MESSAGE_TYPE_WEB
Definition: queue_defs.h:20
void microState(xgi::Input *in, xgi::Output *out)
void sumAndPackTriggerReport(MsgBuf &buf)
toolbox::task::ActionSignature * asScalers_
bool supervisor(toolbox::task::WorkLoop *wl)
TriggerReportStatic * getPackedTriggerReportAsStruct()
Definition: FWEPWrapper.h:115
#define MAX_PIPE_BUFFER_SIZE
Definition: queue_defs.h:42
T mod(const T &a, const T &b)
Definition: ecalDccMap.h:4
int totalEventsPassed() const
pthread_mutex_t stop_lock_
void evfep_alarmhandler(int sig, siginfo_t *info, void *c)
unsigned int getScalersUpdates()
Definition: FWEPWrapper.h:136
unsigned int dqm
Definition: queue_defs.h:65
void uptime(std::ostringstream *out)
Definition: procUtils.cc:309
void publishConfigAndMonitorItems(bool)
Definition: FWEPWrapper.cc:112
xdata::InfoSpace * scalersInfoSpace_
unsigned int ms
Definition: queue_defs.h:64
void sendStat(const unsigned char *, size_t, unsigned int)
Definition: RateStat.cc:19
void actionPerformed(xdata::Event &e)
void microState(xgi::Input *in, xgi::Output *out)
void setRcms(xdaq::ApplicationDescriptor *rcms)
Definition: FWEPWrapper.h:89
static const unsigned int logRingSize_
xdata::Boolean epInitialized_
void findRcmsStateListener()
void enableEndPaths(bool active)