CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_5_3_3/src/EventFilter/Processor/src/FUEventProcessor.cc

Go to the documentation of this file.
00001 
00002 //
00003 // FUEventProcessor
00004 // ----------------
00005 //
00007 
00008 #include "FUEventProcessor.h"
00009 #include "procUtils.h"
00010 #include "EventFilter/Utilities/interface/CPUStat.h"
00011 #include "EventFilter/Utilities/interface/RateStat.h"
00012 
00013 #include "EventFilter/Utilities/interface/Exception.h"
00014 
00015 #include "EventFilter/Message2log4cplus/interface/MLlog4cplus.h"
00016 #include "EventFilter/Modules/interface/FUShmDQMOutputService.h"
00017 #include "EventFilter/Utilities/interface/ServiceWebRegistry.h"
00018 #include "EventFilter/Utilities/interface/ServiceWeb.h"
00019 #include "EventFilter/Utilities/interface/ModuleWeb.h"
00020 #include "EventFilter/Utilities/interface/ModuleWebRegistry.h"
00021 #include "EventFilter/Modules/src/FUShmOutputModule.h"
00022 
00023 #include "FWCore/PluginManager/interface/ProblemTracker.h"
00024 #include "FWCore/PluginManager/interface/PresenceFactory.h"
00025 #include "FWCore/Utilities/interface/Presence.h"
00026 #include "FWCore/Utilities/interface/Exception.h"
00027 #include "FWCore/Version/interface/GetReleaseVersion.h"
00028 #include "FWCore/ServiceRegistry/interface/Service.h"
00029 
00030 #include "toolbox/BSem.h" 
00031 #include "toolbox/Runtime.h"
00032 #include "toolbox/stacktrace.h"
00033 #include "toolbox/net/Utils.h"
00034 
00035 #include <boost/tokenizer.hpp>
00036 
00037 #include "xcept/tools.h"
00038 #include "xgi/Method.h"
00039 
00040 #include "cgicc/CgiDefs.h"
00041 #include "cgicc/Cgicc.h"
00042 #include "cgicc/FormEntry.h"
00043 
00044 
00045 #include <sys/wait.h>
00046 #include <sys/utsname.h>
00047 #include <sys/mman.h>
00048 #include <signal.h>
00049 
00050 #include <typeinfo>
00051 #include <stdlib.h>
00052 #include <stdio.h>
00053 #include <errno.h>
00054 
00055 
00056 using namespace evf;
00057 using namespace cgicc;
00058 namespace toolbox {
00059   namespace mem {
00060     extern toolbox::BSem * _s_mutex_ptr_; 
00061   }
00062 }
00063 
00064 //signal handler (global)
00065 namespace evf {
00066   FUEventProcessor * FUInstancePtr_;
00067   int evfep_raised_signal;
00068   void evfep_sighandler(int sig, siginfo_t* info, void* c)
00069   {
00070     evfep_raised_signal=sig;
00071     FUInstancePtr_->handleSignalSlave(sig, info, c);
00072   }
00073   void evfep_alarmhandler(int sig, siginfo_t* info, void* c)
00074   {
00075     if (evfep_raised_signal) {
00076       signal(evfep_raised_signal,SIG_DFL);
00077       raise(evfep_raised_signal);
00078     }
00079   }
00080 }
00081 
00083 // construction/destruction
00085 
00086 //______________________________________________________________________________
00087 FUEventProcessor::FUEventProcessor(xdaq::ApplicationStub *s) 
00088   : xdaq::Application(s)
00089   , fsm_(this)
00090   , log_(getApplicationLogger())
00091   , evtProcessor_(log_, getApplicationDescriptor()->getInstance())
00092   , runNumber_(0)
00093   , epInitialized_(false)
00094   , outPut_(true)
00095   , autoRestartSlaves_(false)
00096   , slaveRestartDelaySecs_(10)
00097   , hasShMem_(true)
00098   , hasPrescaleService_(true)
00099   , hasModuleWebRegistry_(true)
00100   , hasServiceWebRegistry_(true)
00101   , isRunNumberSetter_(true)
00102   , iDieStatisticsGathering_(false)
00103   , outprev_(true)
00104   , exitOnError_(true)
00105   , reasonForFailedState_()
00106   , squidnet_(3128,"https://localhost:8000/RELEASE-NOTES.txt")
00107   , logRing_(logRingSize_)
00108   , logRingIndex_(logRingSize_)
00109   , logWrap_(false)
00110   , nbSubProcesses_(0)
00111   , nbSubProcessesReporting_(0)
00112   , forkInEDM_(true)
00113   , nblive_(0)
00114   , nbdead_(0)
00115   , nbTotalDQM_(0)
00116   , wlReceiving_(0)
00117   , asReceiveMsgAndExecute_(0)
00118   , receiving_(false) 
00119   , wlReceivingMonitor_(0)
00120   , asReceiveMsgAndRead_(0)
00121   , receivingM_(false)
00122   , myProcess_(0)
00123   , wlSupervising_(0)
00124   , asSupervisor_(0)
00125   , supervising_(false)
00126   , wlSignalMonitor_(0)
00127   , asSignalMonitor_(0)
00128   , signalMonitorActive_(false)
00129   , monitorInfoSpace_(0)
00130   , monitorLegendaInfoSpace_(0)
00131   , applicationInfoSpace_(0)
00132   , nbProcessed(0)
00133   , nbAccepted(0)
00134   , scalersInfoSpace_(0)
00135   , scalersLegendaInfoSpace_(0)
00136   , wlScalers_(0)
00137   , asScalers_(0)
00138   , wlScalersActive_(false)
00139   , scalersUpdates_(0)
00140   , wlSummarize_(0)
00141   , asSummarize_(0)
00142   , wlSummarizeActive_(false)
00143   , superSleepSec_(1)
00144   , iDieUrl_("none")
00145   , vulture_(0)
00146   , vp_(0)
00147   , cpustat_(0)
00148   , ratestat_(0)
00149   , mwrRef_(nullptr)
00150   , sorRef_(nullptr)
00151   , master_message_prg_(0,MSQM_MESSAGE_TYPE_PRG)
00152   , master_message_prr_(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_PRR)
00153   , slave_message_prr_(sizeof(prg),MSQS_MESSAGE_TYPE_PRR)
00154   , master_message_trr_(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_TRR)
00155   , edm_init_done_(true)
00156   , crashesThisRun_(false)
00157   , rlimit_coresize_changed_(false)
00158   , crashesToDump_(2)
00159   , sigmon_sem_(0)
00160 {
00161   using namespace utils;
00162 
00163   FUInstancePtr_=this;
00164 
00165   names_.push_back("nbProcessed"    );
00166   names_.push_back("nbAccepted"     );
00167   names_.push_back("epMacroStateInt");
00168   names_.push_back("epMicroStateInt");
00169   // create pipe for web communication
00170   int retpipe = pipe(anonymousPipe_);
00171   if(retpipe != 0)
00172         LOG4CPLUS_ERROR(getApplicationLogger(),"Failed to create pipe");
00173   // check squid presence
00174   squidPresent_ = squidnet_.check();
00175   //pass application parameters to FWEPWrapper
00176   evtProcessor_.setAppDesc(getApplicationDescriptor());
00177   evtProcessor_.setAppCtxt(getApplicationContext());
00178   // bind relevant callbacks to finite state machine
00179   fsm_.initialize<evf::FUEventProcessor>(this);
00180   
00181   //set sourceId_
00182   url_ =
00183     getApplicationDescriptor()->getContextDescriptor()->getURL()+"/"+
00184     getApplicationDescriptor()->getURN();
00185   class_   =getApplicationDescriptor()->getClassName();
00186   instance_=getApplicationDescriptor()->getInstance();
00187   sourceId_=class_.toString()+"_"+instance_.toString();
00188   LOG4CPLUS_INFO(getApplicationLogger(),sourceId_ <<" constructor"         );
00189   LOG4CPLUS_INFO(getApplicationLogger(),"CMSSW_BASE:"<<getenv("CMSSW_BASE"));
00190   
00191   getApplicationDescriptor()->setAttribute("icon", "/evf/images/epicon.jpg");
00192   
00193   xdata::InfoSpace *ispace = getApplicationInfoSpace();
00194   applicationInfoSpace_ = ispace;
00195 
00196   // default configuration
00197   ispace->fireItemAvailable("parameterSet",         &configString_                );
00198   ispace->fireItemAvailable("epInitialized",        &epInitialized_               );
00199   ispace->fireItemAvailable("stateName",             fsm_.stateName()             );
00200   ispace->fireItemAvailable("runNumber",            &runNumber_                   );
00201   ispace->fireItemAvailable("outputEnabled",        &outPut_                      );
00202 
00203   ispace->fireItemAvailable("hasSharedMemory",      &hasShMem_);
00204   ispace->fireItemAvailable("hasPrescaleService",   &hasPrescaleService_          );
00205   ispace->fireItemAvailable("hasModuleWebRegistry", &hasModuleWebRegistry_        );
00206   ispace->fireItemAvailable("hasServiceWebRegistry", &hasServiceWebRegistry_      );
00207   ispace->fireItemAvailable("isRunNumberSetter",    &isRunNumberSetter_           );
00208   ispace->fireItemAvailable("iDieStatisticsGathering",   &iDieStatisticsGathering_);
00209   ispace->fireItemAvailable("rcmsStateListener",     fsm_.rcmsStateListener()     );
00210   ispace->fireItemAvailable("foundRcmsStateListener",fsm_.foundRcmsStateListener());
00211   ispace->fireItemAvailable("nbSubProcesses",       &nbSubProcesses_              );
00212   ispace->fireItemAvailable("nbSubProcessesReporting",&nbSubProcessesReporting_   );
00213   ispace->fireItemAvailable("forkInEDM"             ,&forkInEDM_                  );
00214   ispace->fireItemAvailable("superSleepSec",        &superSleepSec_               );
00215   ispace->fireItemAvailable("autoRestartSlaves",    &autoRestartSlaves_           );
00216   ispace->fireItemAvailable("slaveRestartDelaySecs",&slaveRestartDelaySecs_       );
00217   ispace->fireItemAvailable("iDieUrl",              &iDieUrl_                     );
00218   ispace->fireItemAvailable("crashesToDump"         ,&crashesToDump_              );
00219 
00220   // Add infospace listeners for exporting data values
00221   getApplicationInfoSpace()->addItemChangedListener("parameterSet",        this);
00222   getApplicationInfoSpace()->addItemChangedListener("outputEnabled",       this);
00223 
00224   // findRcmsStateListener
00225   fsm_.findRcmsStateListener();
00226   
00227   // initialize monitoring infospace
00228 
00229   std::string monInfoSpaceName="evf-eventprocessor-status-monitor";
00230   toolbox::net::URN urn = this->createQualifiedInfoSpace(monInfoSpaceName);
00231   monitorInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
00232 
00233   std::string monLegendaInfoSpaceName="evf-eventprocessor-status-legenda";
00234   urn = this->createQualifiedInfoSpace(monLegendaInfoSpaceName);
00235   monitorLegendaInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
00236 
00237   
00238   monitorInfoSpace_->fireItemAvailable("url",                      &url_            );
00239   monitorInfoSpace_->fireItemAvailable("class",                    &class_          );
00240   monitorInfoSpace_->fireItemAvailable("instance",                 &instance_       );
00241   monitorInfoSpace_->fireItemAvailable("runNumber",                &runNumber_      );
00242   monitorInfoSpace_->fireItemAvailable("stateName",                 fsm_.stateName()); 
00243 
00244   monitorInfoSpace_->fireItemAvailable("squidPresent",             &squidPresent_   );
00245 
00246   std::string scalersInfoSpaceName="evf-eventprocessor-scalers-monitor";
00247   urn = this->createQualifiedInfoSpace(scalersInfoSpaceName);
00248   scalersInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
00249 
00250   std::string scalersLegendaInfoSpaceName="evf-eventprocessor-scalers-legenda";
00251   urn = this->createQualifiedInfoSpace(scalersLegendaInfoSpaceName);
00252   scalersLegendaInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
00253 
00254 
00255 
00256   evtProcessor_.setScalersInfoSpace(scalersInfoSpace_,scalersLegendaInfoSpace_);
00257   scalersInfoSpace_->fireItemAvailable("instance", &instance_);
00258 
00259   evtProcessor_.setApplicationInfoSpace(ispace);
00260   evtProcessor_.setMonitorInfoSpace(monitorInfoSpace_,monitorLegendaInfoSpace_);
00261   evtProcessor_.publishConfigAndMonitorItems(nbSubProcesses_.value_!=0);
00262 
00263   //subprocess state vectors for MP
00264   monitorInfoSpace_->fireItemAvailable("epMacroStateInt",             &spMStates_); 
00265   monitorInfoSpace_->fireItemAvailable("epMicroStateInt",             &spmStates_); 
00266   
00267   // Bind web interface
00268   xgi::bind(this, &FUEventProcessor::css,              "styles.css");
00269   xgi::bind(this, &FUEventProcessor::defaultWebPage,   "Default"   );
00270   xgi::bind(this, &FUEventProcessor::spotlightWebPage, "Spotlight" );
00271   xgi::bind(this, &FUEventProcessor::scalersWeb,       "scalersWeb");
00272   xgi::bind(this, &FUEventProcessor::pathNames,        "pathNames" );
00273   xgi::bind(this, &FUEventProcessor::subWeb,           "SubWeb"    );
00274   xgi::bind(this, &FUEventProcessor::getSlavePids,     "getSlavePids");
00275   xgi::bind(this, &FUEventProcessor::moduleWeb,        "moduleWeb" );
00276   xgi::bind(this, &FUEventProcessor::serviceWeb,       "serviceWeb");
00277   xgi::bind(this, &FUEventProcessor::microState,       "microState");
00278   xgi::bind(this, &FUEventProcessor::updater,          "updater"   );
00279   xgi::bind(this, &FUEventProcessor::procStat,         "procStat"  );
00280 
00281   // instantiate the plugin manager, not referenced here after!
00282 
00283   edm::AssertHandler ah;
00284 
00285   try{
00286     LOG4CPLUS_DEBUG(getApplicationLogger(),
00287                     "Trying to create message service presence ");
00288     edm::PresenceFactory *pf = edm::PresenceFactory::get();
00289     if(pf != 0) {
00290       pf->makePresence("MessageServicePresence").release();
00291     }
00292     else {
00293       LOG4CPLUS_ERROR(getApplicationLogger(),
00294                       "Unable to create message service presence ");
00295     }
00296   } 
00297   catch(...) {
00298     LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception");
00299   }
00300   ML::MLlog4cplus::setAppl(this);      
00301 
00302   typedef std::set<xdaq::ApplicationDescriptor*> AppDescSet_t;
00303   typedef AppDescSet_t::iterator                 AppDescIter_t;
00304   
00305   AppDescSet_t rcms=
00306     getApplicationContext()->getDefaultZone()->
00307     getApplicationDescriptors("RCMSStateListener");
00308   if(rcms.size()==0) 
00309     {
00310       LOG4CPLUS_WARN(getApplicationLogger(),
00311                        "MonitorReceiver not found, perhaphs it has not been defined ? Scalers updater wl will bail out!");
00312       //        localLog("-W- MonitorReceiver not found, perhaphs it has not been defined ? Scalers updater wl will bail out!");
00313     }
00314   else
00315     {
00316       AppDescIter_t it = rcms.begin();
00317       evtProcessor_.setRcms(*it);
00318     }
00319   pthread_mutex_init(&start_lock_,0);
00320   pthread_mutex_init(&stop_lock_,0);
00321   pthread_mutex_init(&pickup_lock_,0);
00322 
00323   forkInfoObj_=nullptr;
00324   pthread_mutex_init(&forkObjLock_,0);
00325   makeStaticInfo();
00326   startSupervisorLoop();  
00327 
00328   if(vulture_==0) vulture_ = new Vulture(true);
00329 
00331 
00332   AppDescSet_t setOfiDie=
00333     getApplicationContext()->getDefaultZone()->
00334     getApplicationDescriptors("evf::iDie");
00335   
00336   for (AppDescIter_t it=setOfiDie.begin();it!=setOfiDie.end();++it)
00337     if ((*it)->getInstance()==0) // there has to be only one instance of iDie
00338       iDieUrl_ = (*it)->getContextDescriptor()->getURL() + "/" + (*it)->getURN();
00339 
00340   //save default core file size
00341   getrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
00342 
00343   //prepare IPC semaphore for getting the workloop waked up on signal caught in slaves
00344   #ifdef linux
00345   if (sigmon_sem_==0) {
00346     sigmon_sem_ = (sem_t*)mmap(NULL, sizeof(sem_t),
00347         PROT_READ | PROT_WRITE,
00348         MAP_ANONYMOUS | MAP_SHARED, 0, 0);
00349     if (!sigmon_sem_) {
00350       perror("mmap error\n");
00351       std::cout << "mmap error"<<std::endl;
00352     }
00353     else
00354       sem_init(sigmon_sem_,true,0);
00355   }
00356   #endif
00357 }
00358 //___________here ends the *huge* constructor___________________________________
00359 
00360 
00361 //______________________________________________________________________________
00362 FUEventProcessor::~FUEventProcessor()
00363 {
00364   // no longer needed since the wrapper is a member of the class and one can rely on 
00365   // implicit destructor - to be revised - at any rate the most common exit path is via "kill"...
00366   //  if (evtProcessor_) delete evtProcessor_;
00367   if(vulture_ != 0) delete vulture_;
00368 }
00369 
00370 
00372 // implementation of member functions
00374 
00375 
00376 //______________________________________________________________________________
00377 bool FUEventProcessor::configuring(toolbox::task::WorkLoop* wl)
00378 {
00379 //   std::cout << "values " << ((nbSubProcesses_.value_!=0) ? 0x10 : 0) << " "
00380 //          << ((instance_.value_==0) ? 0x8 : 0) << " "
00381 //          << (hasServiceWebRegistry_.value_ ? 0x4 : 0) << " "
00382 //          << (hasModuleWebRegistry_.value_ ? 0x2 : 0) << " "
00383 //          << (hasPrescaleService_.value_ ? 0x1 : 0) <<std::endl;
00384   unsigned short smap 
00385     = ((nbSubProcesses_.value_!=0) ? 0x10 : 0)
00386     + (((instance_.value_%80)==0) ? 0x8 : 0) // have at least one legend per slice
00387     + (hasServiceWebRegistry_.value_ ? 0x4 : 0) 
00388     + (hasModuleWebRegistry_.value_ ? 0x2 : 0) 
00389     + (hasPrescaleService_.value_ ? 0x1 : 0);
00390   if(nbSubProcesses_.value_==0) 
00391     {
00392       spMStates_.setSize(1); 
00393       spmStates_.setSize(1); 
00394     }
00395   else
00396     {
00397       spMStates_.setSize(nbSubProcesses_.value_);
00398       spmStates_.setSize(nbSubProcesses_.value_);
00399       for(unsigned int i = 0; i < spMStates_.size(); i++)
00400         {
00401           spMStates_[i] = edm::event_processor::sInit; 
00402           spmStates_[i] = 0; 
00403         }
00404     }
00405   try {
00406     LOG4CPLUS_INFO(getApplicationLogger(),"Start configuring ...");
00407     std::string cfg = configString_.toString(); evtProcessor_.init(smap,cfg);
00408     epInitialized_=true;
00409     if(evtProcessor_)
00410       {
00411         //get ref of mwr
00412         mwrRef_ = evtProcessor_.getModuleWebRegistry();
00413         sorRef_ = evtProcessor_.getShmOutputModuleRegistry();
00414         // moved to wrapper class
00415         configuration_ = evtProcessor_.configuration();
00416         if(nbSubProcesses_.value_==0) evtProcessor_.startMonitoringWorkLoop(); 
00417         evtProcessor_->beginJob(); 
00418         if(cpustat_) {delete cpustat_; cpustat_=0;}
00419         cpustat_ = new CPUStat(evtProcessor_.getNumberOfMicrostates(),
00420                                nbSubProcesses_.value_,
00421                                instance_.value_,
00422                                iDieUrl_.value_);
00423         if(ratestat_) {delete ratestat_; ratestat_=0;}
00424         ratestat_ = new RateStat(iDieUrl_.value_);
00425         if(iDieStatisticsGathering_.value_){
00426           try{
00427             cpustat_->sendLegenda(evtProcessor_.getmicromap());
00428             xdata::Serializable *legenda = scalersLegendaInfoSpace_->find("scalersLegenda");
00429             if(legenda !=0){
00430               std::string slegenda = ((xdata::String*)legenda)->value_;
00431               ratestat_->sendLegenda(slegenda);
00432             }
00433 
00434           }
00435           catch(evf::Exception &e){
00436             LOG4CPLUS_INFO(getApplicationLogger(),"coud not send legenda"
00437                            << e.what());
00438           }
00439         }
00440         
00441         fsm_.fireEvent("ConfigureDone",this);
00442         LOG4CPLUS_INFO(getApplicationLogger(),"Finished configuring!");
00443         localLog("-I- Configuration completed");
00444 
00445       }
00446   }
00447   catch (xcept::Exception &e) {
00448     reasonForFailedState_ = "configuring FAILED: " + (std::string)e.what();
00449     fsm_.fireFailed(reasonForFailedState_,this);
00450     localLog(reasonForFailedState_);
00451   }
00452   catch(cms::Exception &e) {
00453     reasonForFailedState_ = e.explainSelf();
00454     fsm_.fireFailed(reasonForFailedState_,this);
00455     localLog(reasonForFailedState_);
00456   }    
00457   catch(std::exception &e) {
00458     reasonForFailedState_ = e.what();
00459     fsm_.fireFailed(reasonForFailedState_,this);
00460     localLog(reasonForFailedState_);
00461   }
00462   catch(...) {
00463     fsm_.fireFailed("Unknown Exception",this);
00464   }
00465 
00466   if(vulture_!=0 && vp_ == 0) vp_ = vulture_->makeProcess();
00467 
00468   return false;
00469 }
00470 
00471 
00472 
00473 
00474 //______________________________________________________________________________
00475 bool FUEventProcessor::enabling(toolbox::task::WorkLoop* wl)
00476 {
00477   nbTotalDQM_ = 0;
00478   scalersUpdates_ = 0;
00479   idleProcStats_ = 0;
00480   allProcStats_ = 0;
00481 //   std::cout << "values " << ((nbSubProcesses_.value_!=0) ? 0x10 : 0) << " "
00482 //          << ((instance_.value_==0) ? 0x8 : 0) << " "
00483 //          << (hasServiceWebRegistry_.value_ ? 0x4 : 0) << " "
00484 //          << (hasModuleWebRegistry_.value_ ? 0x2 : 0) << " "
00485 //          << (hasPrescaleService_.value_ ? 0x1 : 0) <<std::endl;
00486   unsigned short smap 
00487     = ((nbSubProcesses_.value_!=0) ? 0x10 : 0)
00488     + (((instance_.value_%80)==0) ? 0x8 : 0) // have at least one legend per slice
00489     + (hasServiceWebRegistry_.value_ ? 0x4 : 0) 
00490     + (hasModuleWebRegistry_.value_ ? 0x2 : 0) 
00491     + (hasPrescaleService_.value_ ? 0x1 : 0);
00492 
00493   LOG4CPLUS_INFO(getApplicationLogger(),"Start enabling...");
00494   
00495   //reset core limit size
00496   if (rlimit_coresize_changed_)
00497     setrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
00498   rlimit_coresize_changed_=false;
00499   crashesThisRun_=0;
00500   //recreate signal monitor sem
00501   sem_destroy(sigmon_sem_);
00502   sem_init(sigmon_sem_,true,0);
00503 
00504   if(!epInitialized_){
00505     evtProcessor_.forceInitEventProcessorMaybe();
00506   }
00507   std::string cfg = configString_.toString(); evtProcessor_.init(smap,cfg);
00508   
00509   mwrRef_ = evtProcessor_.getModuleWebRegistry();
00510   sorRef_ = evtProcessor_.getShmOutputModuleRegistry();
00511 
00512   if(!epInitialized_){
00513     evtProcessor_->beginJob(); 
00514     if(cpustat_) {delete cpustat_; cpustat_=0;}
00515     cpustat_ = new CPUStat(evtProcessor_.getNumberOfMicrostates(),
00516                            nbSubProcesses_.value_,
00517                            instance_.value_,
00518                            iDieUrl_.value_);
00519     if(ratestat_) {delete ratestat_; ratestat_=0;}
00520     ratestat_ = new RateStat(iDieUrl_.value_);
00521     if(iDieStatisticsGathering_.value_){
00522       try
00523         {
00524         cpustat_->sendLegenda(evtProcessor_.getmicromap());
00525         xdata::Serializable *legenda = scalersLegendaInfoSpace_->find("scalersLegenda");
00526         if(legenda !=0)
00527           {
00528             std::string slegenda = ((xdata::String*)legenda)->value_;
00529             ratestat_->sendLegenda(slegenda);
00530           }
00531         }
00532       catch(evf::Exception &e)
00533         {
00534           LOG4CPLUS_INFO(getApplicationLogger(),"could not send legenda"
00535                          << e.what());
00536         }
00537       catch (xcept::Exception& e) {
00538         LOG4CPLUS_ERROR(getApplicationLogger(),"Failed to get or send legenda."
00539                         << e.what());
00540       }
00541     }
00542     epInitialized_ = true;
00543   }
00544   configuration_ = evtProcessor_.configuration(); // get it again after init has been carried out...
00545   evtProcessor_.resetLumiSectionReferenceIndex();
00546   //classic appl will return here 
00547   if(nbSubProcesses_.value_==0) return enableClassic();
00548   //protect manipulation of subprocess array
00549   pthread_mutex_lock(&start_lock_);
00550   pthread_mutex_lock(&pickup_lock_);
00551   subs_.clear();
00552   subs_.resize(nbSubProcesses_.value_); // this should not be necessary
00553   pid_t retval = -1;
00554 
00555   for(unsigned int i=0; i<nbSubProcesses_.value_; i++)
00556     {
00557       subs_[i]=SubProcess(i,retval); //this will replace all the scattered variables
00558     }
00559   pthread_mutex_unlock(&pickup_lock_);
00560 
00561   pthread_mutex_unlock(&start_lock_);
00562 
00563   //set expected number of child EP's for the Init message(s) sent to the SM
00564   try {
00565     if (sorRef_) {
00566       unsigned int nbExpectedEPWriters = nbSubProcesses_.value_;
00567       if (nbExpectedEPWriters==0) nbExpectedEPWriters=1;//master instance processing
00568       std::vector<edm::FUShmOutputModule *> shmOutputs = sorRef_->getShmOutputModules();
00569       for (unsigned int i=0;i<shmOutputs.size();i++) {
00570         shmOutputs[i]->setNExpectedEPs(nbExpectedEPWriters);
00571       }
00572     }
00573   }
00574   catch (...)
00575   {
00576     LOG4CPLUS_ERROR(getApplicationLogger(),"Thrown Exception while setting nExpectedEPs in shmOutputs");
00577   }
00578 
00579   //use new method if configured
00580   edm_init_done_=true;
00581   if (forkInEDM_.value_) {
00582     edm_init_done_=false;
00583     enableForkInEDM();
00584   }
00585   else
00586     for(unsigned int i=0; i<nbSubProcesses_.value_; i++)
00587     {
00588       retval = subs_[i].forkNew();
00589       if(retval==0)
00590         {
00591           myProcess_ = &subs_[i];
00592           // dirty hack: delete/recreate global binary semaphore for later use in child
00593           delete toolbox::mem::_s_mutex_ptr_;
00594           toolbox::mem::_s_mutex_ptr_ = new toolbox::BSem(toolbox::BSem::FULL,true);
00595           int retval = pthread_mutex_destroy(&stop_lock_);
00596           if(retval != 0) perror("error");
00597           retval = pthread_mutex_init(&stop_lock_,0);
00598           if(retval != 0) perror("error");
00599           fsm_.disableRcmsStateNotification();
00600           
00601           return enableMPEPSlave();
00602           // the loop is broken in the child 
00603         }
00604     }
00605 
00606   if (forkInEDM_.value_) {
00607 
00608     edm::event_processor::State st;
00609     while (!edm_init_done_) {
00610       usleep(10000);
00611       st = evtProcessor_->getState();
00612       if (st==edm::event_processor::sError || st==edm::event_processor::sInvalid) break;
00613     }
00614     st = evtProcessor_->getState();
00615     //error handling: EP must fork during sRunning
00616     if (st!=edm::event_processor::sRunning) {
00617       reasonForFailedState_ = std::string("Master edm::EventProcessor in state ") + evtProcessor_->stateName(st);
00618       localLog(reasonForFailedState_);
00619       fsm_.fireFailed(reasonForFailedState_,this);
00620       return false;
00621     }
00622 
00623     sleep(1);
00624     startSummarizeWorkLoop();
00625     startSignalMonitorWorkLoop();//only with new forking
00626     vp_ = vulture_->start(iDieUrl_.value_,runNumber_.value_);
00627 
00628     //enable after we are done with conditions loading and forking
00629     LOG4CPLUS_INFO(getApplicationLogger(),"Finished enabling!");
00630     fsm_.fireEvent("EnableDone",this);
00631     localLog("-I- Start completed");
00632     return false;
00633   }
00634 
00635   startSummarizeWorkLoop();
00636   vp_ = vulture_->start(iDieUrl_.value_,runNumber_.value_);
00637   LOG4CPLUS_INFO(getApplicationLogger(),"Finished enabling!");
00638   fsm_.fireEvent("EnableDone",this);
00639   localLog("-I- Start completed");
00640   return false;
00641 }
00642 
00643 bool FUEventProcessor::doEndRunInEDM() {
00644   //taking care that EP in master is in stoppable state
00645   if (forkInfoObj_) {
00646 
00647     int count = 30;
00648     bool waitedForEDM=false;
00649     while (!edm_init_done_ && count) {
00650       ::sleep(1);
00651       if (count%5==0)
00652         LOG4CPLUS_WARN(log_,"MASTER EP: Stopping while EP busy in beginRun. waiting " <<count<< "sec");
00653       count--;
00654       waitedForEDM=true;
00655     }
00656     //sleep a few more seconds it was early stop
00657     if (waitedForEDM) sleep(5);
00658 
00659     //if (count==0) fsm_.fireFailed("failed to stop Master EP",this);
00660 
00661     if (evtProcessor_->getState()==edm::event_processor::sJobReady)
00662       return true;//we are already done
00663 
00664     forkInfoObj_->lock();
00665     forkInfoObj_->stopCondition=true;
00666     sem_post(forkInfoObj_->control_sem_);
00667     forkInfoObj_->unlock();
00668 
00669     count = 30;
00670 
00671     edm::event_processor::State st;
00672     while(count--) {
00673       st = evtProcessor_->getState();
00674       if (st==edm::event_processor::sRunning) ::usleep(100000);
00675       else if (st==edm::event_processor::sStopping || st==edm::event_processor::sJobReady) {
00676         break;
00677       }
00678       else {
00679         std::ostringstream ost;
00680         ost << "Master edm::EventProcessor is in state "<< evtProcessor_->stateName(st) << " while stopping";
00681         LOG4CPLUS_ERROR(getApplicationLogger(),ost.str());
00682         fsm_.fireFailed(ost.str(),this);
00683         return false;
00684       }
00685       if (count%5==0 && st==edm::event_processor::sRunning && !forkInfoObj_->receivedStop_) {
00686         forkInfoObj_->lock();
00687         forkInfoObj_->stopCondition=true;
00688         sem_post(forkInfoObj_->control_sem_);
00689         forkInfoObj_->unlock();
00690         LOG4CPLUS_WARN(getApplicationLogger(),
00691           "Master edm::EventProcessor still running after "<< (30-count-1) << " seconds. \"sem_post\" was executed again" );
00692       }
00693     }
00694     if (count<0) {
00695       std::ostringstream ost;
00696       if (!forkInfoObj_->receivedStop_)
00697         ost << "Timeout waiting for Master edm::EventProcessor to go stopping state "<<evtProcessor_->stateName(st) << ": input source did not receive stop signal!";
00698       else
00699         ost << "Timeout waiting for Master edm::EventProcessor to go stopping state "<<evtProcessor_->stateName(st);
00700       LOG4CPLUS_ERROR(getApplicationLogger(),ost.str());
00701       fsm_.fireFailed(ost.str(),this);
00702       return false;
00703     }
00704   }
00705   return true;
00706 }
00707 
00708 //______________________________________________________________________________
00709 bool FUEventProcessor::stopping(toolbox::task::WorkLoop* wl)
00710 {
00711   setrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
00712   if(nbSubProcesses_.value_!=0) {
00713     stopSlavesAndAcknowledge();
00714     if (forkInEDM_.value_) {
00715             //only in new forking for now
00716             sem_post(sigmon_sem_);
00717             doEndRunInEDM();
00718     }
00719   }
00720   vulture_->stop();
00721 
00722   if (forkInEDM_.value_) {
00723     //shared memory was already disconnected in master
00724     bool tmpHasShMem_=hasShMem_;
00725     hasShMem_=false;
00726     stopClassic();
00727     hasShMem_=tmpHasShMem_;
00728     return false;
00729   }
00730   stopClassic();
00731   return false;
00732 }
00733 
00734 
00735 //______________________________________________________________________________
00736 bool FUEventProcessor::halting(toolbox::task::WorkLoop* wl)
00737 {
00738   LOG4CPLUS_INFO(getApplicationLogger(),"Start halting ...");
00739   setrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
00740   if(nbSubProcesses_.value_!=0) { 
00741     stopSlavesAndAcknowledge();
00742     if (forkInEDM_.value_) {
00743             sem_post(sigmon_sem_);
00744             doEndRunInEDM();
00745     }
00746   }
00747   try{
00748     evtProcessor_.stopAndHalt();
00749     //cleanup forking variables
00750     if (forkInfoObj_) {
00751       delete forkInfoObj_;
00752       forkInfoObj_=0;
00753     }
00754   }
00755   catch (evf::Exception &e) {
00756     reasonForFailedState_ = "halting FAILED: " + (std::string)e.what();
00757     localLog(reasonForFailedState_);
00758     fsm_.fireFailed(reasonForFailedState_,this);
00759   }
00760   //  if(hasShMem_) detachDqmFromShm();
00761 
00762   LOG4CPLUS_INFO(getApplicationLogger(),"Finished halting!");
00763   fsm_.fireEvent("HaltDone",this);
00764 
00765   localLog("-I- Halt completed");
00766   return false;
00767 }
00768 
00769 
00770 //______________________________________________________________________________
00771 xoap::MessageReference FUEventProcessor::fsmCallback(xoap::MessageReference msg)
00772   throw (xoap::exception::Exception)
00773 {
00774   return fsm_.commandCallback(msg);
00775 }
00776 
00777 
00778 //______________________________________________________________________________
00779 void FUEventProcessor::actionPerformed(xdata::Event& e)
00780 {
00781 
00782   if (e.type()=="ItemChangedEvent" && fsm_.stateName()->toString()!="Halted") {
00783     std::string item = dynamic_cast<xdata::ItemChangedEvent&>(e).itemName();
00784     
00785     if ( item == "parameterSet") {
00786       LOG4CPLUS_WARN(getApplicationLogger(),
00787                      "HLT Menu changed, force reinitialization of EventProcessor");
00788       epInitialized_ = false;
00789     }
00790     if ( item == "outputEnabled") {
00791       if(outprev_ != outPut_) {
00792         LOG4CPLUS_WARN(getApplicationLogger(),
00793                        (outprev_ ? "Disabling " : "Enabling ")<<"global output");
00794         evtProcessor_->enableEndPaths(outPut_);
00795         outprev_ = outPut_;
00796       }
00797     }
00798     if (item == "globalInputPrescale") {
00799       LOG4CPLUS_WARN(getApplicationLogger(),
00800                      "Setting global input prescale has no effect "
00801                      <<"in this version of the code");
00802     }
00803     if ( item == "globalOutputPrescale") {
00804       LOG4CPLUS_WARN(getApplicationLogger(),
00805                      "Setting global output prescale has no effect "
00806                      <<"in this version of the code");
00807     }
00808   }
00809   
00810 }
00811 
00812 //______________________________________________________________________________
00813 void FUEventProcessor::getSlavePids(xgi::Input  *in, xgi::Output *out)
00814 {
00815   for (unsigned int i=0;i<subs_.size();i++)
00816   {
00817     if (i!=0) *out << ",";
00818     *out << subs_[i].pid();
00819   }
00820 }
00821 //______________________________________________________________________________
00822 void FUEventProcessor::subWeb(xgi::Input  *in, xgi::Output *out)
00823 {
00824   using namespace cgicc;
00825   pid_t pid = 0;
00826   std::ostringstream ost;
00827   ost << "&";
00828 
00829   Cgicc cgi(in);
00830   internal::MyCgi *mycgi = (internal::MyCgi*)in;
00831   for(std::map<std::string, std::string, std::less<std::string> >::iterator mit = 
00832         mycgi->getEnvironment().begin();
00833       mit != mycgi->getEnvironment().end(); mit++)
00834     ost << mit->first << "%" << mit->second << ";";
00835   std::vector<FormEntry> els = cgi.getElements() ;
00836   std::vector<FormEntry> el1;
00837   cgi.getElement("method",el1);
00838   std::vector<FormEntry> el2;
00839   cgi.getElement("process",el2);
00840   if(el1.size()!=0) {
00841     std::string meth = el1[0].getValue();
00842     if(el2.size()!=0) {
00843       unsigned int i = 0;
00844       std::string mod = el2[0].getValue();
00845       pid = atoi(mod.c_str()); // get the process id to be polled
00846       for(; i < subs_.size(); i++)
00847         if(subs_[i].pid()==pid) break;
00848       if(i>=subs_.size()){ //process was not found, let the browser know
00849         *out << "ERROR 404 : Process " << pid << " Not Found !" << std::endl;
00850         return;
00851       } 
00852       if(subs_[i].alive() != 1){
00853         *out << "ERROR 405 : Process " << pid << " Not Alive !" << std::endl;
00854         return;
00855       }
00856       MsgBuf msg1(meth.length()+ost.str().length()+1,MSQM_MESSAGE_TYPE_WEB);
00857       strncpy(msg1->mtext,meth.c_str(),meth.length());
00858       strncpy(msg1->mtext+meth.length(),ost.str().c_str(),ost.str().length());
00859       subs_[i].post(msg1,true);
00860       unsigned int keep_supersleep_original_value = superSleepSec_.value_;
00861       superSleepSec_.value_=10*keep_supersleep_original_value;
00862       MsgBuf msg(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_WEB);
00863       bool done = false;
00864       std::vector<char *>pieces;
00865       while(!done){
00866         unsigned long retval1 = subs_[i].rcvNonBlocking(msg,true);
00867         if(retval1 == MSGQ_MESSAGE_TYPE_RANGE){
00868 	  ::sleep(1);
00869           continue;
00870         }
00871         unsigned int nbytes = atoi(msg->mtext);
00872         if(nbytes < MAX_PIPE_BUFFER_SIZE) done = true; // this will break the while loop
00873         char *buf= new char[nbytes];
00874         ssize_t retval = read(anonymousPipe_[PIPE_READ],buf,nbytes);
00875         if(retval!=nbytes) std::cout 
00876           << "CAREFUL HERE, read less bytes than expected from pipe in subWeb" << std::endl;
00877         pieces.push_back(buf);
00878       }
00879       superSleepSec_.value_=keep_supersleep_original_value;
00880       for(unsigned int j = 0; j < pieces.size(); j++){
00881         *out<<pieces[j];    // chain the buffers into the output strstream
00882         delete[] pieces[j]; //make sure to release all buffers used for reading the pipe
00883       }
00884     }
00885   }
00886 }
00887 
00888 
00889 //______________________________________________________________________________
00890 void FUEventProcessor::defaultWebPage(xgi::Input  *in, xgi::Output *out)
00891   throw (xgi::exception::Exception)
00892 {
00893 
00894 
00895   *out << "<!DOCTYPE HTML PUBLIC \"-//W3C//DTD HTML 4.0 Transitional//EN\">" 
00896        << "<html><head><title>" << getApplicationDescriptor()->getClassName() << (nbSubProcesses_.value_ > 0 ? "MP " : " ") 
00897        << getApplicationDescriptor()->getInstance() << "</title>"
00898        << "<meta https-equiv=\"REFRESH\" content=\"0;url=/evf/html/defaultBasePage.html\">"
00899        << "</head></html>";
00900 }
00901 
00902 
00903 //______________________________________________________________________________
00904 
00905 
00906 void FUEventProcessor::spotlightWebPage(xgi::Input  *in, xgi::Output *out)
00907   throw (xgi::exception::Exception)
00908 {
00909 
00910   std::string urn = getApplicationDescriptor()->getURN();
00911 
00912   *out << "<!-- base href=\"/" <<  urn
00913        << "\"> -->" << std::endl;
00914   *out << "<html>"                                                   << std::endl;
00915   *out << "<head>"                                                   << std::endl;
00916   *out << "<link type=\"text/css\" rel=\"stylesheet\"";
00917   *out << " href=\"/evf/html/styles.css\"/>"                   << std::endl;
00918   *out << "<title>" << getApplicationDescriptor()->getClassName() 
00919        << getApplicationDescriptor()->getInstance() 
00920        << " MAIN</title>"     << std::endl;
00921   *out << "</head>"                                                  << std::endl;
00922   *out << "<body>"                                                   << std::endl;
00923   *out << "<table border=\"0\" width=\"100%\">"                      << std::endl;
00924   *out << "<tr>"                                                     << std::endl;
00925   *out << "  <td align=\"left\">"                                    << std::endl;
00926   *out << "    <img"                                                 << std::endl;
00927   *out << "     align=\"middle\""                                    << std::endl;
00928   *out << "     src=\"/evf/images/spoticon.jpg\""                            << std::endl;
00929   *out << "     alt=\"main\""                                        << std::endl;
00930   *out << "     width=\"64\""                                        << std::endl;
00931   *out << "     height=\"64\""                                       << std::endl;
00932   *out << "     border=\"\"/>"                                       << std::endl;
00933   *out << "    <b>"                                                  << std::endl;
00934   *out << getApplicationDescriptor()->getClassName() 
00935        << getApplicationDescriptor()->getInstance()                  << std::endl;
00936   *out << "      " << fsm_.stateName()->toString()                   << std::endl;
00937   *out << "    </b>"                                                 << std::endl;
00938   *out << "  </td>"                                                  << std::endl;
00939   *out << "  <td width=\"32\">"                                      << std::endl;
00940   *out << "    <a href=\"/urn:xdaq-application:lid=3\">"             << std::endl;
00941   *out << "      <img"                                               << std::endl;
00942   *out << "       align=\"middle\""                                  << std::endl;
00943   *out << "       src=\"/hyperdaq/images/HyperDAQ.jpg\""             << std::endl;
00944   *out << "       alt=\"HyperDAQ\""                                  << std::endl;
00945   *out << "       width=\"32\""                                      << std::endl;
00946   *out << "       height=\"32\""                                     << std::endl;
00947   *out << "       border=\"\"/>"                                     << std::endl;
00948   *out << "    </a>"                                                 << std::endl;
00949   *out << "  </td>"                                                  << std::endl;
00950   *out << "  <td width=\"32\">"                                      << std::endl;
00951   *out << "  </td>"                                                  << std::endl;
00952   *out << "  <td width=\"32\">"                                      << std::endl;
00953   *out << "    <a href=\"/" << urn << "/\">"                         << std::endl;
00954   *out << "      <img"                                               << std::endl;
00955   *out << "       align=\"middle\""                                  << std::endl;
00956   *out << "       src=\"/evf/images/epicon.jpg\""                    << std::endl;
00957   *out << "       alt=\"main\""                                      << std::endl;
00958   *out << "       width=\"32\""                                      << std::endl;
00959   *out << "       height=\"32\""                                     << std::endl;
00960   *out << "       border=\"\"/>"                                     << std::endl;
00961   *out << "    </a>"                                                 << std::endl;
00962   *out << "  </td>"                                                  << std::endl;
00963   *out << "</tr>"                                                    << std::endl;
00964   *out << "</table>"                                                 << std::endl;
00965 
00966   *out << "<hr/>"                                                    << std::endl;
00967   
00968   std::ostringstream ost;
00969   if(myProcess_) 
00970     ost << "/SubWeb?process=" << getpid() << "&method=moduleWeb&";
00971   else
00972     ost << "/moduleWeb?";
00973   urn += ost.str();
00974   if(evtProcessor_ && (myProcess_ || nbSubProcesses_.value_==0))
00975     evtProcessor_.taskWebPage(in,out,urn);
00976   else if(evtProcessor_)
00977     evtProcessor_.summaryWebPage(in,out,urn);
00978   else
00979     *out << "<td>HLT Unconfigured</td>" << std::endl;
00980   *out << "</table>"                                                 << std::endl;
00981   
00982   *out << "<br><textarea rows=" << 10 << " cols=80 scroll=yes>"      << std::endl;
00983   *out << configuration_                                             << std::endl;
00984   *out << "</textarea><P>"                                           << std::endl;
00985   
00986   *out << "</body>"                                                  << std::endl;
00987   *out << "</html>"                                                  << std::endl;
00988 
00989 
00990 }
00991 void FUEventProcessor::scalersWeb(xgi::Input  *in, xgi::Output *out)
00992   throw (xgi::exception::Exception)
00993 {
00994 
00995   out->getHTTPResponseHeader().addHeader( "Content-Type",
00996                                           "application/octet-stream" );
00997   out->getHTTPResponseHeader().addHeader( "Content-Transfer-Encoding",
00998                                           "binary" );
00999   if(evtProcessor_ != 0){
01000     out->write( (char*)(evtProcessor_.getPackedTriggerReportAsStruct()), sizeof(TriggerReportStatic) );
01001   }
01002 }
01003 
01004 void FUEventProcessor::pathNames(xgi::Input  *in, xgi::Output *out)
01005   throw (xgi::exception::Exception)
01006 {
01007 
01008   if(evtProcessor_ != 0){
01009     xdata::Serializable *legenda = scalersLegendaInfoSpace_->find("scalersLegenda");
01010     if(legenda !=0){
01011       std::string slegenda = ((xdata::String*)legenda)->value_;
01012       *out << slegenda << std::endl;
01013     }
01014   }
01015 }
01016 
01017 
01018 void FUEventProcessor::setAttachDqmToShm() throw (evf::Exception)  
01019 {
01020   std::string errmsg;
01021   try {
01022     edm::ServiceRegistry::Operate operate(evtProcessor_->getToken());
01023     if(edm::Service<FUShmDQMOutputService>().isAvailable())
01024       edm::Service<FUShmDQMOutputService>()->setAttachToShm();
01025   }
01026   catch (cms::Exception& e) {
01027     errmsg = "Failed to set to attach DQM service to shared memory: " + (std::string)e.what();
01028   }
01029   if (!errmsg.empty()) XCEPT_RAISE(evf::Exception,errmsg);
01030 }
01031 
01032 
01033 void FUEventProcessor::attachDqmToShm() throw (evf::Exception)  
01034 {
01035   std::string errmsg;
01036   bool success = false;
01037   try {
01038     edm::ServiceRegistry::Operate operate(evtProcessor_->getToken());
01039     if(edm::Service<FUShmDQMOutputService>().isAvailable())
01040       success = edm::Service<FUShmDQMOutputService>()->attachToShm();
01041     if (!success) errmsg = "Failed to attach DQM service to shared memory";
01042   }
01043   catch (cms::Exception& e) {
01044     errmsg = "Failed to attach DQM service to shared memory: " + (std::string)e.what();
01045   }
01046   if (!errmsg.empty()) XCEPT_RAISE(evf::Exception,errmsg);
01047 }
01048 
01049 
01050 
01051 void FUEventProcessor::detachDqmFromShm() throw (evf::Exception)
01052 {
01053   std::string errmsg;
01054   bool success = false;
01055   try {
01056     edm::ServiceRegistry::Operate operate(evtProcessor_->getToken());
01057     if(edm::Service<FUShmDQMOutputService>().isAvailable())
01058       success = edm::Service<FUShmDQMOutputService>()->detachFromShm();
01059     if (!success) errmsg = "Failed to detach DQM service from shared memory";
01060   }
01061   catch (cms::Exception& e) {
01062     errmsg = "Failed to detach DQM service from shared memory: " + (std::string)e.what();
01063   }
01064   if (!errmsg.empty()) XCEPT_RAISE(evf::Exception,errmsg);
01065 }
01066 
01067 
01068 std::string FUEventProcessor::logsAsString()
01069 {
01070   std::ostringstream oss;
01071   if(logWrap_)
01072     {
01073       for(unsigned int i = logRingIndex_; i < logRing_.size(); i++)
01074         oss << logRing_[i] << std::endl;
01075       for(unsigned int i = 0; i <  logRingIndex_; i++)
01076         oss << logRing_[i] << std::endl;
01077     }
01078   else
01079       for(unsigned int i = logRingIndex_; i < logRing_.size(); i++)
01080         oss << logRing_[i] << std::endl;
01081     
01082   return oss.str();
01083 }
01084   
01085 void FUEventProcessor::localLog(std::string m)
01086 {
01087   timeval tv;
01088 
01089   gettimeofday(&tv,0);
01090   tm *uptm = localtime(&tv.tv_sec);
01091   char datestring[256];
01092   strftime(datestring, sizeof(datestring),"%c", uptm);
01093 
01094   if(logRingIndex_ == 0){logWrap_ = true; logRingIndex_ = logRingSize_;}
01095   logRingIndex_--;
01096   std::ostringstream timestamp;
01097   timestamp << " at " << datestring;
01098   m += timestamp.str();
01099   logRing_[logRingIndex_] = m;
01100 }
01101 
01102 void FUEventProcessor::startSupervisorLoop()
01103 {
01104   try {
01105     wlSupervising_=
01106       toolbox::task::getWorkLoopFactory()->getWorkLoop("Supervisor",
01107                                                        "waiting");
01108     if (!wlSupervising_->isActive()) wlSupervising_->activate();
01109     asSupervisor_ = toolbox::task::bind(this,&FUEventProcessor::supervisor,
01110                                         "Supervisor");
01111     wlSupervising_->submit(asSupervisor_);
01112     supervising_ = true;
01113   }
01114   catch (xcept::Exception& e) {
01115     std::string msg = "Failed to start workloop 'Supervisor'.";
01116     XCEPT_RETHROW(evf::Exception,msg,e);
01117   }
01118 }
01119 
01120 void FUEventProcessor::startReceivingLoop()
01121 {
01122   try {
01123     wlReceiving_=
01124       toolbox::task::getWorkLoopFactory()->getWorkLoop("Receiving",
01125                                                        "waiting");
01126     if (!wlReceiving_->isActive()) wlReceiving_->activate();
01127     asReceiveMsgAndExecute_ = toolbox::task::bind(this,&FUEventProcessor::receiving,
01128                                         "Receiving");
01129     wlReceiving_->submit(asReceiveMsgAndExecute_);
01130     receiving_ = true;
01131   }
01132   catch (xcept::Exception& e) {
01133     std::string msg = "Failed to start workloop 'Receiving'.";
01134     XCEPT_RETHROW(evf::Exception,msg,e);
01135   }
01136 }
01137 void FUEventProcessor::startReceivingMonitorLoop()
01138 {
01139   try {
01140     wlReceivingMonitor_=
01141       toolbox::task::getWorkLoopFactory()->getWorkLoop("ReceivingM",
01142                                                        "waiting");
01143     if (!wlReceivingMonitor_->isActive()) 
01144       wlReceivingMonitor_->activate();
01145     asReceiveMsgAndRead_ = 
01146       toolbox::task::bind(this,&FUEventProcessor::receivingAndMonitor,
01147                           "ReceivingM");
01148     wlReceivingMonitor_->submit(asReceiveMsgAndRead_);
01149     receivingM_ = true;
01150   }
01151   catch (xcept::Exception& e) {
01152     std::string msg = "Failed to start workloop 'ReceivingM'.";
01153     XCEPT_RETHROW(evf::Exception,msg,e);
01154   }
01155 }
01156 
01157 bool FUEventProcessor::receiving(toolbox::task::WorkLoop *)
01158 {
01159   MsgBuf msg;
01160   try{
01161     myProcess_->rcvSlave(msg,false); //will receive only messages from Master
01162     if(msg->mtype==MSQM_MESSAGE_TYPE_RLI)
01163       {
01164         rlimit rl;
01165         getrlimit(RLIMIT_CORE,&rl);
01166         rl.rlim_cur=0;
01167         setrlimit(RLIMIT_CORE,&rl);
01168         rlimit_coresize_changed_=true;
01169       }
01170     if (msg->mtype==MSQM_MESSAGE_TYPE_RLR)
01171       {
01172         //reset coresize limit
01173         setrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
01174         rlimit_coresize_changed_=false;
01175       }
01176     if(msg->mtype==MSQM_MESSAGE_TYPE_STOP)
01177       {
01178         pthread_mutex_lock(&stop_lock_);
01179         try {
01180           fsm_.fireEvent("Stop",this); // need to set state in fsm first to allow stopDone transition
01181         }
01182         catch (...) {
01183           LOG4CPLUS_ERROR(getApplicationLogger(),"Failed to go to Stopping state in slave EP, pid "
01184                                                  << getpid() << " The state on Stop event was not consistent");
01185         }
01186         try{
01187           LOG4CPLUS_DEBUG(getApplicationLogger(),
01188                           "Trying to create message service presence ");
01189           edm::PresenceFactory *pf = edm::PresenceFactory::get();
01190           if(pf != 0) {
01191             pf->makePresence("MessageServicePresence").release();
01192           }
01193           else {
01194             LOG4CPLUS_ERROR(getApplicationLogger(),
01195                             "Unable to create message service presence ");
01196           }
01197         } 
01198         catch(...) {
01199           LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception");
01200         }
01201         try {
01202           stopClassic(); // call the normal sequence of stopping - as this is allowed to fail provisions must be made ...@@@EM
01203         }
01204         catch (...) {
01205           LOG4CPLUS_ERROR(getApplicationLogger(),"Slave EP " << getpid() << " stop failed. Process will now exit");
01206         }
01207 
01208         MsgBuf msg1(0,MSQS_MESSAGE_TYPE_STOP);
01209         myProcess_->postSlave(msg1,false);
01210         pthread_mutex_unlock(&stop_lock_);
01211         fclose(stdout);
01212         fclose(stderr);
01213         _exit(EXIT_SUCCESS);
01214       }
01215     if(msg->mtype==MSQM_MESSAGE_TYPE_FSTOP)
01216       _exit(EXIT_SUCCESS);
01217   }
01218   catch(evf::Exception &e){}
01219   return true;
01220 }
01221 
01222 bool FUEventProcessor::supervisor(toolbox::task::WorkLoop *)
01223 {
01224   pthread_mutex_lock(&stop_lock_);
01225   if(subs_.size()!=nbSubProcesses_.value_)
01226     {
01227       pthread_mutex_lock(&pickup_lock_);
01228       if(subs_.size()!=nbSubProcesses_.value_) {
01229         subs_.resize(nbSubProcesses_.value_);
01230         spMStates_.resize(nbSubProcesses_.value_);
01231         spmStates_.resize(nbSubProcesses_.value_);
01232         for(unsigned int i = 0; i < spMStates_.size(); i++)
01233           {
01234             spMStates_[i] = edm::event_processor::sInit; 
01235             spmStates_[i] = 0; 
01236           }
01237       }
01238       pthread_mutex_unlock(&pickup_lock_);
01239     }
01240   bool running = fsm_.stateName()->toString()=="Enabled";
01241   bool stopping = fsm_.stateName()->toString()=="stopping";
01242   for(unsigned int i = 0; i < subs_.size(); i++)
01243     {
01244       if(subs_[i].alive()==-1000) continue;
01245       int sl;
01246       pid_t sub_pid = subs_[i].pid();
01247       pid_t killedOrNot = waitpid(sub_pid,&sl,WNOHANG);
01248 
01249       if(killedOrNot && killedOrNot==sub_pid) {
01250         pthread_mutex_lock(&pickup_lock_);
01251         //check if out of range or recreated (enable can clear vector)
01252         if (i<subs_.size() && subs_[i].alive()!=-1000) {
01253           subs_[i].setStatus((WIFEXITED(sl) != 0 ? 0 : -1));
01254           std::ostringstream ost;
01255           if(subs_[i].alive()==0) ost << " process exited with status " << WEXITSTATUS(sl);
01256           else if(WIFSIGNALED(sl)!=0) {
01257             ost << " process terminated with signal " << WTERMSIG(sl);
01258           }
01259           else ost << " process stopped ";
01260           subs_[i].countdown()=slaveRestartDelaySecs_.value_;
01261           subs_[i].setReasonForFailed(ost.str());
01262           spMStates_[i] = evtProcessor_.notstarted_state_code();
01263           spmStates_[i] = 0;
01264           std::ostringstream ost1;
01265           ost1 << "-E- Slave " << subs_[i].pid() << ost.str();
01266           localLog(ost1.str());
01267           if(!autoRestartSlaves_.value_) subs_[i].disconnect();
01268         }
01269         pthread_mutex_unlock(&pickup_lock_);
01270       }
01271     }
01272   pthread_mutex_unlock(&stop_lock_);    
01273   if(stopping) return true; // if in stopping we are done
01274 
01275   // check if we need to reset core dumps (15 min after last one)
01276   if (running && rlimit_coresize_changed_) {
01277     timeval newtv;
01278     gettimeofday(&newtv,0);
01279     int delta = newtv.tv_sec-lastCrashTime_.tv_sec;
01280     if (delta>60*15) {
01281       std::ostringstream ostr;
01282       ostr << " No more slave EP crashes on this machine in last 15 min. resetting core size limits";
01283       std::cout << ostr.str() << std::endl;
01284       LOG4CPLUS_INFO(getApplicationLogger(),ostr.str());
01285       setrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
01286       MsgBuf master_message_rlr_(NUMERIC_MESSAGE_SIZE,MSQM_MESSAGE_TYPE_RLR);
01287       for (unsigned int i = 0; i < subs_.size(); i++) {
01288         try {
01289           if (subs_[i].alive())
01290             subs_[i].post(master_message_rlr_,false);
01291         }
01292         catch (...) {}
01293       }
01294       rlimit_coresize_changed_=false;
01295       crashesThisRun_=0;
01296     }
01297   }
01298 
01299   if(running && edm_init_done_)
01300     {
01301       // if enabled, this loop will periodically check if dead slaves countdown has expired and restart them
01302       // this is only active while running, hence, the stop lock is acquired and only released at end of loop
01303       if(autoRestartSlaves_.value_){
01304         pthread_mutex_lock(&stop_lock_); //lockout slave killing at stop while you check for restarts
01305         for(unsigned int i = 0; i < subs_.size(); i++)
01306           {
01307             if(subs_[i].alive() != 1){
01308               if(subs_[i].countdown() == 0)
01309                 {
01310                   if(subs_[i].restartCount()>2){
01311                     LOG4CPLUS_WARN(getApplicationLogger()," Not restarting subprocess in slot " << i 
01312                                    << " - maximum restart count reached");
01313                     std::ostringstream ost1;
01314                     ost1 << "-W- Dead Process in slot " << i << " reached maximum restart count"; 
01315                     localLog(ost1.str());
01316                     subs_[i].countdown()--;
01317                     XCEPT_DECLARE(evf::Exception,
01318                                   sentinelException, ost1.str());
01319                     notifyQualified("error",sentinelException);
01320                     subs_[i].disconnect();
01321                     continue;
01322                   }
01323                   subs_[i].restartCount()++;
01324                   if (forkInEDM_.value_) {
01325                     restartForkInEDM(i);
01326                   }
01327                   else {
01328                     pid_t rr = subs_[i].forkNew();
01329                     if(rr==0)
01330                     {
01331                       myProcess_=&subs_[i];
01332                       scalersUpdates_ = 0;
01333                       int retval = pthread_mutex_destroy(&stop_lock_);
01334                       if(retval != 0) perror("error");
01335                       retval = pthread_mutex_init(&stop_lock_,0);
01336                       if(retval != 0) perror("error");
01337                       fsm_.disableRcmsStateNotification();
01338                       fsm_.fireEvent("Stop",this); // need to set state in fsm first to allow stopDone transition
01339                       fsm_.fireEvent("StopDone",this); // need to set state in fsm first to allow stopDone transition
01340                       fsm_.fireEvent("Enable",this); // need to set state in fsm first to allow stopDone transition
01341                       try{
01342                         xdata::Serializable *lsid = applicationInfoSpace_->find("lumiSectionIndex");
01343                         if(lsid) {
01344                           ((xdata::UnsignedInteger32*)(lsid))->value_--; // need to reset to value before end of ls in which process died
01345                         }
01346                       }
01347                       catch(...){
01348                         std::cout << "trouble with lsindex during restart" << std::endl;
01349                       }
01350                       try{
01351                         xdata::Serializable *lstb = applicationInfoSpace_->find("lsToBeRecovered");
01352                         if(lstb) {
01353                           ((xdata::Boolean*)(lstb))->value_ = false; // do not issue eol/bol for all Ls when restarting
01354                         }
01355                       }
01356                       catch(...){
01357                         std::cout << "trouble with resetting flag for eol recovery " << std::endl;
01358                       }
01359 
01360                       evtProcessor_.adjustLsIndexForRestart();
01361                       evtProcessor_.resetPackedTriggerReport();
01362                       enableMPEPSlave();
01363                       return false; // exit the supervisor loop immediately in the child !!!
01364                     }
01365                   else
01366                     {
01367                       std::ostringstream ost1;
01368                       ost1 << "-I- New Process " << rr << " forked for slot " << i; 
01369                       localLog(ost1.str());
01370                     }
01371                   }
01372                 }
01373               if(subs_[i].countdown()>=0) subs_[i].countdown()--;
01374             }
01375           }
01376         pthread_mutex_unlock(&stop_lock_);
01377       } // finished handling replacement of dead slaves once they've been reaped
01378     }
01379   xdata::Serializable *lsid = 0; 
01380   xdata::Serializable *psid = 0;
01381   xdata::Serializable *dqmp = 0;
01382   xdata::UnsignedInteger32 *dqm = 0;
01383 
01384 
01385   
01386   if(running && edm_init_done_){  
01387     try{
01388       lsid = applicationInfoSpace_->find("lumiSectionIndex");
01389       psid = applicationInfoSpace_->find("prescaleSetIndex");
01390       nbProcessed = monitorInfoSpace_->find("nbProcessed");
01391       nbAccepted  = monitorInfoSpace_->find("nbAccepted");
01392       dqmp = applicationInfoSpace_-> find("nbDqmUpdates");      
01393     }
01394     catch(xdata::exception::Exception e){
01395       LOG4CPLUS_INFO(getApplicationLogger(),"could not retrieve some data - " << e.what());    
01396     }
01397 
01398     try{
01399       if(nbProcessed !=0 && nbAccepted !=0)
01400         {
01401           xdata::UnsignedInteger32*nbp = ((xdata::UnsignedInteger32*)nbProcessed);
01402           xdata::UnsignedInteger32*nba = ((xdata::UnsignedInteger32*)nbAccepted);
01403           xdata::UnsignedInteger32*ls  = ((xdata::UnsignedInteger32*)lsid);
01404           xdata::UnsignedInteger32*ps  = ((xdata::UnsignedInteger32*)psid);
01405           if(dqmp!=0)
01406             dqm = (xdata::UnsignedInteger32*)dqmp;
01407           if(dqm) dqm->value_ = 0;
01408           nbTotalDQM_ = 0;
01409           nbp->value_ = 0;
01410           nba->value_ = 0;
01411           nblive_ = 0;
01412           nbdead_ = 0;
01413           scalersUpdates_ = 0;
01414 
01415           for(unsigned int i = 0; i < subs_.size(); i++)
01416             {
01417               if(subs_[i].alive()>0)
01418                 {
01419                   nblive_++;
01420                   try{
01421                     subs_[i].post(master_message_prg_,true);
01422                     
01423                     unsigned long retval = subs_[i].rcvNonBlocking(master_message_prr_,true);
01424                     if(retval == (unsigned long) master_message_prr_->mtype){
01425                       prg* p = (struct prg*)(master_message_prr_->mtext);
01426                       subs_[i].setParams(p);
01427                       spMStates_[i] = p->Ms;
01428                       spmStates_[i] = p->ms;
01429                       cpustat_->addEntry(p->ms);
01430                       if(!subs_[i].inInconsistentState() && 
01431                          (p->Ms == edm::event_processor::sError 
01432                           || p->Ms == edm::event_processor::sInvalid
01433                           || p->Ms == edm::event_processor::sStopping))
01434                         {
01435                           std::ostringstream ost;
01436                           ost << "edm::eventprocessor slot " << i << " process id " 
01437                               << subs_[i].pid() << " not in Running state : Mstate=" 
01438                               << evtProcessor_.stateNameFromIndex(p->Ms) << " mstate="
01439                               << evtProcessor_.moduleNameFromIndex(p->ms) 
01440                               << " - Look into possible error messages from HLT process";
01441                           LOG4CPLUS_WARN(getApplicationLogger(),ost.str());
01442                         }
01443                       nbp->value_ += subs_[i].params().nbp;
01444                       nba->value_  += subs_[i].params().nba;
01445                       if(dqm)dqm->value_ += p->dqm;
01446                       nbTotalDQM_ +=  p->dqm;
01447                       scalersUpdates_ += p->trp;
01448                       if(p->ls > ls->value_) ls->value_ = p->ls;
01449                       if(p->ps != ps->value_) ps->value_ = p->ps;
01450                     }
01451                     else{
01452                       nbp->value_ += subs_[i].get_save_nbp();
01453                       nba->value_ += subs_[i].get_save_nba();
01454                     }
01455                   } 
01456                   catch(evf::Exception &e){
01457                     LOG4CPLUS_INFO(getApplicationLogger(),
01458                                    "could not send/receive msg on slot " 
01459                                    << i << " - " << e.what());    
01460                   }
01461                     
01462                 }
01463               else
01464                 {
01465                   nbp->value_ += subs_[i].get_save_nbp();
01466                   nba->value_ += subs_[i].get_save_nba();
01467                   nbdead_++;
01468                 }
01469             }
01470           if(nbp->value_>64){//have some slaves already processed more than one event ? (eventually make this == number of raw cells)
01471             for(unsigned int i = 0; i < subs_.size(); i++)
01472               {
01473                 if(subs_[i].params().nbp == 0){ // a slave has processed 0 events 
01474                   // check that the process is not stuck
01475                   if(subs_[i].alive()>0 && subs_[i].params().ms == 0) // the process is seen alive but in us=Invalid(0)
01476                     {
01477                       subs_[i].found_invalid();//increase the "found_invalid" counter
01478                       if(subs_[i].nfound_invalid() > 60){ //wait x monitor cycles (~1 min a good time ?) before doing something about a stuck slave
01479                         MsgBuf msg3(NUMERIC_MESSAGE_SIZE,MSQM_MESSAGE_TYPE_FSTOP);      // send a force-stop signal             
01480                         subs_[i].post(msg3,false);
01481                         std::ostringstream ost1;
01482                         ost1 << "-W- Process in slot " << i << " Never reached the running state - forcestopping it"; 
01483                         localLog(ost1.str());
01484                         LOG4CPLUS_ERROR(getApplicationLogger(),ost1.str());    
01485                         XCEPT_DECLARE(evf::Exception,
01486                                       sentinelException, ost1.str());
01487                         notifyQualified("error",sentinelException);
01488 
01489                       }
01490                     }
01491                 }
01492               }
01493           }
01494         }
01495     }
01496     catch(std::exception &e){
01497       LOG4CPLUS_INFO(getApplicationLogger(),"std exception - " << e.what());    
01498     }
01499     catch(...){
01500       LOG4CPLUS_INFO(getApplicationLogger(),"unknown exception ");    
01501     }
01502   }
01503   else{
01504     for(unsigned int i = 0; i < subs_.size(); i++)
01505       {
01506         if(subs_[i].alive()==-1000)
01507           {
01508             spMStates_[i] = edm::event_processor::sInit;
01509             spmStates_[i] = 0;
01510           }
01511       }
01512   }
01513   try{
01514     monitorInfoSpace_->lock();
01515     monitorInfoSpace_->fireItemGroupChanged(names_,0);
01516     monitorInfoSpace_->unlock();
01517   }
01518   catch(xdata::exception::Exception &e)
01519     {
01520       LOG4CPLUS_ERROR(log_, "Exception from fireItemGroupChanged: " << e.what());
01521       //        localLog(e.what());
01522     }
01523   ::sleep(superSleepSec_.value_);       
01524   return true;
01525 }
01526 
01527 void FUEventProcessor::startScalersWorkLoop() throw (evf::Exception)
01528 {
01529   try {
01530     wlScalers_=
01531       toolbox::task::getWorkLoopFactory()->getWorkLoop("Scalers",
01532                                                        "waiting");
01533     if (!wlScalers_->isActive()) wlScalers_->activate();
01534     asScalers_ = toolbox::task::bind(this,&FUEventProcessor::scalers,
01535                                      "Scalers");
01536     
01537   wlScalers_->submit(asScalers_);
01538   wlScalersActive_ = true;
01539   }
01540   catch (xcept::Exception& e) {
01541     std::string msg = "Failed to start workloop 'Scalers'.";
01542     XCEPT_RETHROW(evf::Exception,msg,e);
01543   }
01544 }
01545 
01546 //______________________________________________________________________________
01547 
01548 void FUEventProcessor::startSummarizeWorkLoop() throw (evf::Exception)
01549 {
01550   try {
01551     wlSummarize_=
01552       toolbox::task::getWorkLoopFactory()->getWorkLoop("Summary",
01553                                                        "waiting");
01554     if (!wlSummarize_->isActive()) wlSummarize_->activate();
01555     
01556     asSummarize_ = toolbox::task::bind(this,&FUEventProcessor::summarize,
01557                                        "Summary");
01558 
01559     wlSummarize_->submit(asSummarize_);
01560     wlSummarizeActive_ = true;
01561   }
01562   catch (xcept::Exception& e) {
01563     std::string msg = "Failed to start workloop 'Summarize'.";
01564     XCEPT_RETHROW(evf::Exception,msg,e);
01565   }
01566 }
01567 
01568 //______________________________________________________________________________
01569 
01570 bool FUEventProcessor::scalers(toolbox::task::WorkLoop* wl)
01571 {
01572   if(evtProcessor_)
01573     {
01574       if(!evtProcessor_.getTriggerReport(true)) {
01575         wlScalersActive_ = false;
01576         return false;
01577       }
01578     }
01579   else
01580     {
01581       std::cout << getpid()<< " Scalers workloop, bailing out, no evtProcessor " << std::endl;
01582       wlScalersActive_ = false;
01583       return false;
01584     }
01585   if(myProcess_) 
01586     {
01587       //      std::cout << getpid() << "going to post on control queue from scalers" << std::endl;
01588       int ret = myProcess_->postSlave(evtProcessor_.getPackedTriggerReport(),false);
01589       if(ret!=0)      std::cout << "scalers workloop, error posting to sqs_ " << errno << std::endl;
01590       scalersUpdates_++;
01591     }
01592   else
01593     evtProcessor_.fireScalersUpdate();
01594   return true;
01595 }
01596 
01597 //______________________________________________________________________________
01598 bool FUEventProcessor::summarize(toolbox::task::WorkLoop* wl)
01599 {
01600   evtProcessor_.resetPackedTriggerReport();
01601   bool atLeastOneProcessUpdatedSuccessfully = false;
01602   int msgCount = 0;
01603   for (unsigned int i = 0; i < subs_.size(); i++)
01604     {
01605       if(subs_[i].alive()>0)
01606         {
01607           int ret = 0;
01608           if(subs_[i].check_postponed_trigger_update(master_message_trr_,
01609                                                      evtProcessor_.getLumiSectionReferenceIndex()))
01610             {
01611               ret = MSQS_MESSAGE_TYPE_TRR;
01612               std::cout << "using postponed report from slot " << i << " for ls " << evtProcessor_.getLumiSectionReferenceIndex() << std::endl;
01613             }
01614           else{
01615             bool insync = false;
01616             bool exception_caught = false;
01617             while(!insync){
01618               try{
01619                 ret = subs_[i].rcv(master_message_trr_,false);
01620               }
01621               catch(evf::Exception &e)
01622                 {
01623                   std::cout << "exception in msgrcv on " << i 
01624                             << " " << subs_[i].alive() << " " << strerror(errno) << std::endl;
01625                   exception_caught = true;
01626                   break;
01627                   //do nothing special
01628                 }
01629               if(ret==MSQS_MESSAGE_TYPE_TRR) {
01630                 TriggerReportStatic *trp = (TriggerReportStatic *)master_message_trr_->mtext;
01631                 if(trp->lumiSection >= evtProcessor_.getLumiSectionReferenceIndex()){
01632                   insync = true;
01633                 }
01634               }
01635             }
01636             if(exception_caught) continue;
01637           }
01638           msgCount++;
01639           if(ret==MSQS_MESSAGE_TYPE_TRR) {
01640             TriggerReportStatic *trp = (TriggerReportStatic *)master_message_trr_->mtext;
01641             if(trp->lumiSection > evtProcessor_.getLumiSectionReferenceIndex()){
01642               std::cout << "postpone handling of msg from slot " << i << " with Ls " <<  trp->lumiSection
01643                         << " should be " << evtProcessor_.getLumiSectionReferenceIndex() << std::endl;
01644               subs_[i].add_postponed_trigger_update(master_message_trr_);
01645             }else{
01646               atLeastOneProcessUpdatedSuccessfully = true;
01647               evtProcessor_.sumAndPackTriggerReport(master_message_trr_);
01648             }
01649           }
01650           else std::cout << "msgrcv returned error " << errno << std::endl;
01651         }
01652     }
01653   if(atLeastOneProcessUpdatedSuccessfully){
01654     nbSubProcessesReporting_.value_ = msgCount;
01655     evtProcessor_.getPackedTriggerReportAsStruct()->nbExpected = nbSubProcesses_.value_;
01656     evtProcessor_.getPackedTriggerReportAsStruct()->nbReporting = nbSubProcessesReporting_.value_;
01657     evtProcessor_.updateRollingReport();
01658     evtProcessor_.fireScalersUpdate();
01659   }
01660   else{
01661     LOG4CPLUS_WARN(getApplicationLogger(),"Summarize loop: no process updated successfully - sleep 10 seconds before trying again");          
01662     if(msgCount==0) evtProcessor_.withdrawLumiSectionIncrement();
01663     nbSubProcessesReporting_.value_ = 0;
01664     ::sleep(10);
01665   }
01666   if(fsm_.stateName()->toString()!="Enabled"){
01667     wlScalersActive_ = false;
01668     return false;
01669   }
01670   //  cpustat_->printStat();
01671   if(iDieStatisticsGathering_.value_){
01672     try{
01673       unsigned long long idleTmp=idleProcStats_;
01674       unsigned long long allPSTmp=allProcStats_;
01675       idleProcStats_=allProcStats_=0;
01676 
01677       utils::procCpuStat(idleProcStats_,allProcStats_);
01678       if (allPSTmp!=0 && idleTmp!=0 && allProcStats_!=allPSTmp) {
01679         cpustat_->setCPUStat(1000 - ((idleProcStats_-idleTmp)*1000)/(allProcStats_-allPSTmp));
01680         //std::cout << " got proc/stat result of " << 1000-((idleProcStats_-idleTmp)*1000)/(allProcStats_-allPSTmp) << " of 1000 " << std::endl;
01681       }
01682       else cpustat_->setCPUStat(0);
01683 
01684       TriggerReportStatic *trsp = evtProcessor_.getPackedTriggerReportAsStruct();
01685       cpustat_ ->setNproc(trsp->eventSummary.totalEvents);
01686       cpustat_ ->sendStat(evtProcessor_.getLumiSectionReferenceIndex());
01687       ratestat_->sendStat((unsigned char*)trsp,
01688                           sizeof(TriggerReportStatic),
01689                           evtProcessor_.getLumiSectionReferenceIndex());
01690     }catch(evf::Exception &e){
01691       LOG4CPLUS_INFO(getApplicationLogger(),"coud not send statistics"
01692                      << e.what());
01693     }
01694   }
01695   cpustat_->reset();
01696   return true;
01697 }
01698 
01699 
01700 
01701 bool FUEventProcessor::receivingAndMonitor(toolbox::task::WorkLoop *)
01702 {
01703   try{
01704     myProcess_->rcvSlave(slave_message_monitoring_,true); //will receive only messages from Master
01705     switch(slave_message_monitoring_->mtype)
01706       {
01707       case MSQM_MESSAGE_TYPE_MCS:
01708         {
01709           xgi::Input *in = 0;
01710           xgi::Output out;
01711           evtProcessor_.microState(in,&out);
01712           MsgBuf msg1(out.str().size(),MSQS_MESSAGE_TYPE_MCR);
01713           strncpy(msg1->mtext,out.str().c_str(),out.str().size());
01714           myProcess_->postSlave(msg1,true);
01715           break;
01716         }
01717       
01718       case MSQM_MESSAGE_TYPE_PRG:
01719         {
01720           xdata::Serializable *dqmp = 0;
01721           xdata::UnsignedInteger32 *dqm = 0;
01722           evtProcessor_.monitoring(0);
01723           try{
01724             dqmp = applicationInfoSpace_-> find("nbDqmUpdates");
01725           }  catch(xdata::exception::Exception e){}
01726           if(dqmp!=0)
01727             dqm = (xdata::UnsignedInteger32*)dqmp;
01728 
01729           //      monitorInfoSpace_->lock();  
01730           prg * data           = (prg*)slave_message_prr_->mtext;
01731           data->ls             = evtProcessor_.lsid_;
01732           data->eols           = evtProcessor_.lastLumiUsingEol_;
01733           data->ps             = evtProcessor_.psid_;
01734           data->nbp            = evtProcessor_->totalEvents();
01735           data->nba            = evtProcessor_->totalEventsPassed();
01736           data->Ms             = evtProcessor_.epMAltState_.value_;
01737           data->ms             = evtProcessor_.epmAltState_.value_;
01738           if(dqm) data->dqm    = dqm->value_; else data->dqm = 0;
01739           data->trp            = scalersUpdates_;
01740           //      monitorInfoSpace_->unlock();  
01741           myProcess_->postSlave(slave_message_prr_,true);
01742           if(exitOnError_.value_)
01743           { 
01744             // after each monitoring cycle check if we are in inconsistent state and exit if configured to do so  
01745             //      std::cout << getpid() << "receivingAndMonitor: trying to acquire stop lock " << std::endl;
01746             if(data->Ms == edm::event_processor::sStopping || data->Ms == edm::event_processor::sError) 
01747               { 
01748                 bool running = true;
01749                 int count = 0;
01750                 while(running){
01751                   int retval = pthread_mutex_lock(&stop_lock_);
01752                   if(retval != 0) perror("error");
01753                   running = fsm_.stateName()->toString()=="Enabled";
01754                   if(count>5) _exit(-1);
01755                   pthread_mutex_unlock(&stop_lock_);
01756                   if(running) {::sleep(1); count++;}
01757                 }
01758               }
01759             
01760           }
01761           //      scalersUpdates_++;
01762           break;
01763         }
01764       case MSQM_MESSAGE_TYPE_WEB:
01765         {
01766           xgi::Input  *in = 0;
01767           xgi::Output out;
01768           unsigned int bytesToSend = 0;
01769           MsgBuf msg1(NUMERIC_MESSAGE_SIZE,MSQS_MESSAGE_TYPE_WEB);
01770           std::string query = slave_message_monitoring_->mtext;
01771           size_t pos = query.find_first_of("&");
01772           std::string method;
01773           std::string args;
01774           if(pos!=std::string::npos)  
01775             {
01776               method = query.substr(0,pos);
01777               args = query.substr(pos+1,query.length()-pos-1);
01778             }
01779           else
01780             method=query;
01781 
01782           if(method=="Spotlight")
01783             {
01784               spotlightWebPage(in,&out);
01785             }
01786           else if(method=="procStat")
01787             {
01788               procStat(in,&out);
01789             }
01790           else if(method=="moduleWeb")
01791             {
01792               internal::MyCgi mycgi;
01793               boost::char_separator<char> sep(";");
01794               boost::tokenizer<boost::char_separator<char> > tokens(args, sep);
01795               for (boost::tokenizer<boost::char_separator<char> >::iterator tok_iter = tokens.begin();
01796                    tok_iter != tokens.end(); ++tok_iter){
01797                 size_t pos = (*tok_iter).find_first_of("%");
01798                 if(pos != std::string::npos){
01799                   std::string first  = (*tok_iter).substr(0    ,                        pos);
01800                   std::string second = (*tok_iter).substr(pos+1, (*tok_iter).length()-pos-1);
01801                   mycgi.getEnvironment()[first]=second;
01802                 }
01803               }
01804               moduleWeb(&mycgi,&out);
01805             }
01806           else if(method=="Default")
01807             {
01808               defaultWebPage(in,&out);
01809             }
01810           else 
01811             {
01812               out << "Error 404!!!!!!!!" << std::endl;
01813             }
01814 
01815 
01816           bytesToSend = out.str().size();
01817           unsigned int cycle = 0;
01818           if(bytesToSend==0)
01819             {
01820               snprintf(msg1->mtext, NUMERIC_MESSAGE_SIZE, "%d", bytesToSend);
01821               myProcess_->postSlave(msg1,true);
01822             }
01823           while(bytesToSend !=0){
01824             unsigned int msgSize = bytesToSend>MAX_PIPE_BUFFER_SIZE ? MAX_PIPE_BUFFER_SIZE : bytesToSend;
01825             write(anonymousPipe_[PIPE_WRITE],
01826                   out.str().c_str()+MAX_PIPE_BUFFER_SIZE*cycle,
01827                   msgSize);
01828             snprintf(msg1->mtext, NUMERIC_MESSAGE_SIZE, "%d", msgSize);
01829             myProcess_->postSlave(msg1,true);
01830             bytesToSend -= msgSize;
01831             cycle++;
01832           }
01833           break;
01834         }
01835       case MSQM_MESSAGE_TYPE_TRP:
01836         {
01837           break;
01838         }
01839       }
01840   }
01841   catch(evf::Exception &e){std::cout << "exception caught in recevingM: " << e.what() << std::endl;}
01842   return true;
01843 }
01844 
01845 void FUEventProcessor::startSignalMonitorWorkLoop() throw (evf::Exception)
01846 {
01847   //todo rewind/check semaphore
01848   //start workloop
01849   try {
01850     wlSignalMonitor_=
01851       toolbox::task::getWorkLoopFactory()->getWorkLoop("SignalMonitor",
01852                                                        "waiting");
01853 
01854     if (!wlSignalMonitor_->isActive()) wlSignalMonitor_->activate();
01855     asSignalMonitor_ = toolbox::task::bind(this,&FUEventProcessor::sigmon,
01856                                        "SignalMonitor");
01857     wlSignalMonitor_->submit(asSignalMonitor_);
01858     signalMonitorActive_ = true;
01859   }
01860   catch (xcept::Exception& e) {
01861     std::string msg = "Failed to start workloop 'SignalMonitor'. (3)";
01862     std::cout << e.what() << std::endl;
01863     XCEPT_RETHROW(evf::Exception,msg,e);
01864   }
01865 }
01866  
01867 
01868 bool FUEventProcessor::sigmon(toolbox::task::WorkLoop* wl)
01869 {
01870   while (1) {
01871     sem_wait(sigmon_sem_);
01872     std::cout << " received signal notification from slave!"<< std::endl;
01873   
01874     //check if shutdown time
01875     bool running = fsm_.stateName()->toString()=="Enabled";
01876     bool stopping = fsm_.stateName()->toString()=="stopping";
01877     bool enabling = fsm_.stateName()->toString()=="enabling";
01878     if (!running && !enabling) {
01879       signalMonitorActive_ = false;
01880       return false;
01881     }
01882 
01883     crashesThisRun_++;
01884     gettimeofday(&lastCrashTime_,0);
01885 
01886     //set core size limit to 0 in master and slaves
01887     if (crashesThisRun_>=crashesToDump_.value_ && (running || stopping) && !rlimit_coresize_changed_) {
01888 
01889       rlimit rlold;
01890       getrlimit(RLIMIT_CORE,&rlold);
01891       rlimit rlnew = rlold;
01892       rlnew.rlim_cur=0;
01893       setrlimit(RLIMIT_CORE,&rlnew);
01894       rlimit_coresize_changed_=true;
01895       MsgBuf master_message_rli_(NUMERIC_MESSAGE_SIZE,MSQM_MESSAGE_TYPE_RLI);
01896       //in case of frequent crashes, allow first slot to dump (until restart)
01897       unsigned int min=1;
01898       for (unsigned int i = min; i < subs_.size(); i++) {
01899         try {
01900           if (subs_[i].alive()) {
01901             subs_[i].post(master_message_rli_,false);
01902           }
01903         }
01904         catch (...) {}
01905       }
01906       std::ostringstream ostr;
01907       ostr << "Number of recent slave crashes reaches " << crashesThisRun_
01908            << ". Disabling core dumps for next 15 minutes in this FilterUnit";
01909       LOG4CPLUS_WARN(getApplicationLogger(),ostr.str());
01910     }
01911   }//end while loop
01912   signalMonitorActive_ = false;
01913   return false;
01914 }
01915 
01916 
01917 bool FUEventProcessor::enableCommon()
01918 {
01919   try {    
01920     if(hasShMem_) attachDqmToShm();
01921     int sc = 0;
01922     evtProcessor_->clearCounters();
01923     if(isRunNumberSetter_)
01924       evtProcessor_->setRunNumber(runNumber_.value_);
01925     else
01926       evtProcessor_->declareRunNumber(runNumber_.value_);
01927 
01928     try{
01929       ::sleep(1);
01930       evtProcessor_->runAsync();
01931       sc = evtProcessor_->statusAsync();
01932     }
01933     catch(cms::Exception &e) {
01934       reasonForFailedState_ = e.explainSelf();
01935       fsm_.fireFailed(reasonForFailedState_,this);
01936       localLog(reasonForFailedState_);
01937       return false;
01938     }    
01939     catch(std::exception &e) {
01940       reasonForFailedState_  = e.what();
01941       fsm_.fireFailed(reasonForFailedState_,this);
01942       localLog(reasonForFailedState_);
01943       return false;
01944     }
01945     catch(...) {
01946       reasonForFailedState_ = "Unknown Exception";
01947       fsm_.fireFailed(reasonForFailedState_,this);
01948       localLog(reasonForFailedState_);
01949       return false;
01950     }
01951     if(sc != 0) {
01952       std::ostringstream oss;
01953       oss<<"EventProcessor::runAsync returned status code " << sc;
01954       reasonForFailedState_ = oss.str();
01955       fsm_.fireFailed(reasonForFailedState_,this);
01956       localLog(reasonForFailedState_);
01957       return false;
01958     }
01959   }
01960   catch (xcept::Exception &e) {
01961     reasonForFailedState_ = "enabling FAILED: " + (std::string)e.what();
01962     fsm_.fireFailed(reasonForFailedState_,this);
01963     localLog(reasonForFailedState_);
01964     return false;
01965   }
01966   try{
01967     fsm_.fireEvent("EnableDone",this);
01968   }
01969   catch (xcept::Exception &e) {
01970     std::cout << "exception " << (std::string)e.what() << std::endl;
01971     throw;
01972   }
01973 
01974   return false;
01975 }
01976 
01977 void FUEventProcessor::forkProcessFromEDM_helper(void * addr) {
01978   ((FUEventProcessor*)addr)->forkProcessesFromEDM();
01979 }
01980 
01981 void FUEventProcessor::forkProcessesFromEDM() {
01982 
01983   moduleweb::ForkParams * forkParams = &(forkInfoObj_->forkParams);
01984   unsigned int forkFrom=0;
01985   unsigned int forkTo=nbSubProcesses_.value_;
01986   if (forkParams->slotId>=0) {
01987     forkFrom=forkParams->slotId;
01988     forkTo=forkParams->slotId+1;
01989   }
01990 
01991   //before fork, make sure to disconnect output modules from Shm
01992   try {
01993     if (sorRef_) {
01994       std::vector<edm::FUShmOutputModule *> shmOutputs = sorRef_->getShmOutputModules();
01995       for (unsigned int i=0;i<shmOutputs.size();i++) {
01996         //unregister PID from ShmBuffer/RB
01997         shmOutputs[i]->unregisterFromShm();
01998         //disconnect from Shm
01999         shmOutputs[i]->stop();
02000       }
02001     }
02002   }
02003   catch (std::exception &e)
02004   {
02005     reasonForFailedState_ =  (std::string)"Thrown exception while disconnecting ShmOutputModule from Shm: " + e.what();
02006     LOG4CPLUS_ERROR(getApplicationLogger(),reasonForFailedState_);
02007     fsm_.fireFailed(reasonForFailedState_,this);
02008     localLog(reasonForFailedState_);
02009   }
02010   catch (...) {
02011     reasonForFailedState_ =  "Thrown unknown exception while disconnecting ShmOutputModule from Shm: ";
02012     LOG4CPLUS_ERROR(getApplicationLogger(),reasonForFailedState_);
02013     fsm_.fireFailed(reasonForFailedState_,this);
02014     localLog(reasonForFailedState_);
02015   }
02016 
02017   //fork loop
02018   if (fsm_.stateName()->toString()=="stopping") {
02019     LOG4CPLUS_ERROR(getApplicationLogger(),"Can not fork subprocesses in state " << fsm_.stateName()->toString());
02020     forkParams->isMaster=1;
02021     forkInfoObj_->forkParams.slotId=-1;
02022     forkInfoObj_->forkParams.restart=0;
02023   }
02024   else for(unsigned int i=forkFrom; i<forkTo; i++)
02025   {
02026 
02027     int retval = subs_[i].forkNew();
02028     if(retval==0)
02029     {
02030 
02031       forkParams->isMaster=0;
02032       myProcess_ = &subs_[i];
02033       // dirty hack: delete/recreate global binary semaphore for later use in child
02034       delete toolbox::mem::_s_mutex_ptr_;
02035       toolbox::mem::_s_mutex_ptr_ = new toolbox::BSem(toolbox::BSem::FULL,true);
02036       int retval = pthread_mutex_destroy(&stop_lock_);
02037       if(retval != 0) perror("error");
02038       retval = pthread_mutex_init(&stop_lock_,0);
02039       if(retval != 0) perror("error");
02040       fsm_.disableRcmsStateNotification();
02041 
02042       try{
02043         LOG4CPLUS_DEBUG(getApplicationLogger(),
02044             "Trying to create message service presence ");
02045         //release the presense factory in master
02046         edm::PresenceFactory *pf = edm::PresenceFactory::get();
02047         if(pf != 0) {
02048           pf->makePresence("MessageServicePresence").release();
02049         }
02050         else {
02051           LOG4CPLUS_ERROR(getApplicationLogger(),
02052               "Unable to create message service presence ");
02053         }
02054       }
02055       catch(...) {
02056         LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception in MessageServicePresence");
02057       }
02058 
02059       ML::MLlog4cplus::setAppl(this);
02060 
02061       //reconnect to Shm from output modules
02062       try {
02063         if (sorRef_) {
02064           std::vector<edm::FUShmOutputModule *> shmOutputs = sorRef_->getShmOutputModules();
02065           for (unsigned int i=0;i<shmOutputs.size();i++)
02066             shmOutputs[i]->start();
02067         }
02068       }
02069       catch (...)
02070       {
02071         LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception (ShmOutputModule sending InitMsg (pid:"<<getpid() <<")");
02072       }
02073 
02074       if (forkParams->restart) {
02075         //do restart things
02076         scalersUpdates_ = 0;
02077         try {
02078           fsm_.fireEvent("Stop",this); // need to set state in fsm first to allow stopDone transition
02079           fsm_.fireEvent("StopDone",this); // need to set state in fsm first to allow stopDone transition
02080           fsm_.fireEvent("Enable",this); // need to set state in fsm first to allow stopDone transition
02081         } catch (...) {
02082           LOG4CPLUS_WARN(getApplicationLogger(),"Failed to Stop/Enable FSM of the restarted slave EP");
02083         }
02084         try{
02085           xdata::Serializable *lsid = applicationInfoSpace_->find("lumiSectionIndex");
02086           if(lsid) {
02087             ((xdata::UnsignedInteger32*)(lsid))->value_--; // need to reset to value before end of ls in which process died
02088           }
02089         }
02090         catch(...){
02091           std::cout << "trouble with lsindex during restart" << std::endl;
02092         }
02093         try{
02094           xdata::Serializable *lstb = applicationInfoSpace_->find("lsToBeRecovered");
02095           if(lstb) {
02096             ((xdata::Boolean*)(lstb))->value_ = false; // do not issue eol/bol for all Ls when restarting
02097           }
02098         }
02099         catch(...){
02100           std::cout << "trouble with resetting flag for eol recovery " << std::endl;
02101         }
02102         evtProcessor_.adjustLsIndexForRestart();
02103         evtProcessor_.resetPackedTriggerReport();
02104       }
02105 
02106       //start other threads
02107       startReceivingLoop();
02108       startReceivingMonitorLoop();
02109       startScalersWorkLoop();
02110       while(!evtProcessor_.isWaitingForLs())
02111         ::usleep(100000);//wait for scalers loop to start
02112 
02113       //connect DQMShmOutputModule
02114       if(hasShMem_) attachDqmToShm();
02115 
02116       //catch transition error if we are already Enabled
02117       try {
02118         fsm_.fireEvent("EnableDone",this);
02119       }
02120       catch (...) {}
02121 
02122       //make sure workloops are started
02123       while (!wlReceiving_->isActive() || !wlReceivingMonitor_->isActive()) usleep(10000);
02124 
02125       //unmask signals
02126       sigset_t tmpset_thread;
02127       sigemptyset(&tmpset_thread);
02128       sigaddset(&tmpset_thread, SIGQUIT);
02129       sigaddset(&tmpset_thread, SIGILL);
02130       sigaddset(&tmpset_thread, SIGABRT);
02131       sigaddset(&tmpset_thread, SIGFPE);
02132       sigaddset(&tmpset_thread, SIGSEGV);
02133       sigaddset(&tmpset_thread, SIGALRM);
02134       //sigprocmask(SIG_UNBLOCK, &tmpset_thread, 0);
02135       pthread_sigmask(SIG_UNBLOCK,&tmpset_thread,0);
02136      
02137       //set signal handlers 
02138       struct sigaction sa;
02139       sigset_t tmpset;
02140       memset(&tmpset,0,sizeof(tmpset));
02141       sigemptyset(&tmpset);
02142       sa.sa_mask=tmpset;
02143       sa.sa_flags=SA_RESETHAND | SA_SIGINFO;
02144       sa.sa_handler=0;
02145       sa.sa_sigaction=evfep_sighandler;
02146 
02147       sigaction(SIGQUIT,&sa,0);
02148       sigaction(SIGILL,&sa,0);
02149       sigaction(SIGABRT,&sa,0);
02150       sigaction(SIGFPE,&sa,0);
02151       sigaction(SIGSEGV,&sa,0);
02152       sa.sa_sigaction=evfep_alarmhandler;
02153       sigaction(SIGALRM,&sa,0);
02154 
02155       //child return to DaqSource
02156       return ;
02157     }
02158     else {
02159       forkParams->isMaster=1;
02160       forkInfoObj_->forkParams.slotId=-1;
02161       forkInfoObj_->forkParams.restart=0;
02162       if (forkParams->restart) {
02163         std::ostringstream ost1;
02164         ost1 << "-I- New Process " << retval << " forked for slot " << forkParams->slotId;
02165         localLog(ost1.str());
02166       }
02167       //start "crash" receiver workloop
02168     }
02169   }
02170   edm_init_done_=true;
02171 }
02172 
02173 bool FUEventProcessor::enableForkInEDM() 
02174 {
02175   evtProcessor_.resetWaiting();
02176   try {
02177     //set to connect to Shm later
02178     //if(hasShMem_) setAttachDqmToShm();
02179 
02180     int sc = 0;
02181     //maybe not needed in MP mode
02182     evtProcessor_->clearCounters();
02183     if(isRunNumberSetter_)
02184       evtProcessor_->setRunNumber(runNumber_.value_);
02185     else
02186       evtProcessor_->declareRunNumber(runNumber_.value_);
02187 
02188     //prepare object used to communicate with DaqSource
02189     pthread_mutex_destroy(&forkObjLock_);
02190     pthread_mutex_init(&forkObjLock_,0);
02191     if (forkInfoObj_) delete forkInfoObj_;
02192     forkInfoObj_ = new moduleweb::ForkInfoObj();
02193     forkInfoObj_->mst_lock_=&forkObjLock_;
02194     forkInfoObj_->fuAddr=(void*)this;
02195     forkInfoObj_->forkHandler = forkProcessFromEDM_helper;
02196     forkInfoObj_->forkParams.slotId=-1;
02197     forkInfoObj_->forkParams.restart=0;
02198     forkInfoObj_->forkParams.isMaster=-1;
02199     forkInfoObj_->stopCondition=0;
02200     if (mwrRef_)
02201       mwrRef_->publishForkInfo(std::string("DaqSource"),forkInfoObj_);
02202 
02203     evtProcessor_->runAsync();
02204     sc = evtProcessor_->statusAsync();
02205 
02206     if(sc != 0) {
02207       std::ostringstream oss;
02208       oss<<"EventProcessor::runAsync returned status code " << sc;
02209       reasonForFailedState_ = oss.str();
02210       fsm_.fireFailed(reasonForFailedState_,this);
02211       LOG4CPLUS_FATAL(log_,reasonForFailedState_);
02212       return false;
02213     }
02214   }
02215   //catch exceptions on master side
02216   catch(cms::Exception &e) {
02217     reasonForFailedState_ = e.explainSelf();
02218     fsm_.fireFailed(reasonForFailedState_,this);
02219     LOG4CPLUS_FATAL(log_,reasonForFailedState_);
02220     return false;
02221   }    
02222   catch(std::exception &e) {
02223     reasonForFailedState_  = e.what();
02224     fsm_.fireFailed(reasonForFailedState_,this);
02225     LOG4CPLUS_FATAL(log_,reasonForFailedState_);
02226     return false;
02227   }
02228   catch(...) {
02229     reasonForFailedState_ = "Unknown Exception";
02230     fsm_.fireFailed(reasonForFailedState_,this);
02231     LOG4CPLUS_FATAL(log_,reasonForFailedState_);
02232     return false;
02233   }
02234   return true;
02235 }
02236 
02237 bool FUEventProcessor::restartForkInEDM(unsigned int slotId) {
02238   //daqsource will keep this lock until master returns after fork
02239   //so that we don't do another EP restart in between
02240   forkInfoObj_->lock();
02241   forkInfoObj_->forkParams.slotId=slotId;
02242   forkInfoObj_->forkParams.restart=true;
02243   forkInfoObj_->forkParams.isMaster=1;
02244   forkInfoObj_->stopCondition=0;
02245   LOG4CPLUS_DEBUG(log_, " restarting subprocess in slot "<< slotId <<": posting on control semaphore");
02246   sem_post(forkInfoObj_->control_sem_);
02247   forkInfoObj_->unlock();
02248   usleep(1000);
02249   return true;
02250 }
02251 
02252 bool FUEventProcessor::enableClassic()
02253 {
02254   bool retval = enableCommon();
02255   while(evtProcessor_->getState()!= edm::event_processor::sRunning){
02256     LOG4CPLUS_INFO(getApplicationLogger(),"waiting for edm::EventProcessor to start before enabling watchdog");
02257     ::sleep(1);
02258   }
02259   
02260   //  implementation moved to EPWrapper
02261   //  startScalersWorkLoop(); // this is now not done any longer 
02262   localLog("-I- Start completed");
02263   return retval;
02264 }
02265 bool FUEventProcessor::enableMPEPSlave()
02266 {
02267   //all this happens only in the child process
02268 
02269   startReceivingLoop();
02270   startReceivingMonitorLoop();
02271   evtProcessor_.resetWaiting();
02272   startScalersWorkLoop();
02273   while(!evtProcessor_.isWaitingForLs())
02274     ::sleep(1);
02275 
02276   // @EM test do not run monitor loop in slave, only receiving&Monitor
02277   //  evtProcessor_.startMonitoringWorkLoop();
02278   try{
02279     //    evtProcessor_.makeServicesOnly();
02280     try{
02281       LOG4CPLUS_DEBUG(getApplicationLogger(),
02282                       "Trying to create message service presence ");
02283       edm::PresenceFactory *pf = edm::PresenceFactory::get();
02284       if(pf != 0) {
02285         pf->makePresence("MessageServicePresence").release();
02286       }
02287       else {
02288         LOG4CPLUS_ERROR(getApplicationLogger(),
02289                         "Unable to create message service presence ");
02290       }
02291     } 
02292     catch(...) {
02293       LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception");
02294     }
02295   ML::MLlog4cplus::setAppl(this);
02296   }       
02297   catch (xcept::Exception &e) {
02298     reasonForFailedState_ = "enabling FAILED: " + (std::string)e.what();
02299     fsm_.fireFailed(reasonForFailedState_,this);
02300     localLog(reasonForFailedState_);
02301   }
02302   bool retval =  enableCommon();
02303   //  while(evtProcessor_->getState()!= edm::event_processor::sRunning){
02304   //    LOG4CPLUS_INFO(getApplicationLogger(),"waiting for edm::EventProcessor to start before enabling watchdog");
02305   //    ::sleep(1);
02306   //  }
02307   return retval;
02308 }
02309 
02310 bool FUEventProcessor::stopClassic()
02311 {
02312   bool failed=false;
02313   try {
02314     LOG4CPLUS_INFO(getApplicationLogger(),"Start stopping :) ...");
02315     edm::EventProcessor::StatusCode rc = evtProcessor_.stop();
02316     if(rc == edm::EventProcessor::epSuccess) 
02317       fsm_.fireEvent("StopDone",this);
02318     else
02319       {
02320         failed=true;
02321         //      epMState_ = evtProcessor_->currentStateName();
02322         if(rc == edm::EventProcessor::epTimedOut)
02323           reasonForFailedState_ = "EventProcessor stop timed out";
02324         else
02325           reasonForFailedState_ = "EventProcessor did not receive STOP event";
02326         fsm_.fireFailed(reasonForFailedState_,this);
02327         localLog(reasonForFailedState_);
02328       }
02329 
02330     if (failed) LOG4CPLUS_WARN(getApplicationLogger(),"STOP failed: try detaching from Shm");
02331 
02332     //detaching from shared memory
02333     if(hasShMem_) detachDqmFromShm();
02334 
02335     if (failed) LOG4CPLUS_WARN(getApplicationLogger(),"STOP failed: detached from Shm");
02336   }
02337   catch (xcept::Exception &e) {
02338     failed=true;
02339     reasonForFailedState_ = "stopping FAILED: " + (std::string)e.what();
02340   }
02341   catch (edm::Exception &e) {
02342     failed=true;
02343     reasonForFailedState_ = "stopping FAILED: " + (std::string)e.what();
02344   }
02345   catch (...) {
02346     failed=true;
02347     reasonForFailedState_= "STOP failed: unknown exception";
02348   }
02349 
02350   if (failed) {
02351     LOG4CPLUS_WARN(getApplicationLogger(),"STOP failed: return point of the stopping call reached. pid:" << getpid());
02352     localLog(reasonForFailedState_);
02353     fsm_.fireFailed(reasonForFailedState_,this);
02354   }
02355 
02356   LOG4CPLUS_INFO(getApplicationLogger(),"Finished stopping!");
02357   localLog("-I- Stop completed");
02358   return false;
02359 }
02360 
02361 void FUEventProcessor::stopSlavesAndAcknowledge()
02362 {
02363   MsgBuf msg(0,MSQM_MESSAGE_TYPE_STOP);
02364   MsgBuf msg1(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_STOP);
02365 
02366   std::vector<bool> processes_to_stop(nbSubProcesses_.value_,false);
02367   for(unsigned int i = 0; i < subs_.size(); i++)
02368     {
02369       pthread_mutex_lock(&stop_lock_);
02370       if(subs_[i].alive()>0){
02371         processes_to_stop[i] = true;
02372         subs_[i].post(msg,false);
02373       }
02374       pthread_mutex_unlock(&stop_lock_);
02375     }
02376   for(unsigned int i = 0; i < subs_.size(); i++)
02377     {
02378       pthread_mutex_lock(&stop_lock_);
02379       if(processes_to_stop[i]){
02380         try{
02381           subs_[i].rcv(msg1,false);
02382         }
02383         catch(evf::Exception &e){
02384           std::ostringstream ost;
02385           ost << "failed to get STOP - errno ->" << errno << " " << e.what(); 
02386           reasonForFailedState_ = ost.str();
02387           LOG4CPLUS_ERROR(getApplicationLogger(),reasonForFailedState_);
02388           //      fsm_.fireFailed(reasonForFailedState_,this);
02389           localLog(reasonForFailedState_);
02390           pthread_mutex_unlock(&stop_lock_);
02391           continue;
02392         }
02393       }
02394       else {
02395         pthread_mutex_unlock(&stop_lock_);
02396         continue;
02397       }
02398       pthread_mutex_unlock(&stop_lock_);
02399       if(msg1->mtype==MSQS_MESSAGE_TYPE_STOP)
02400         while(subs_[i].alive()>0) ::usleep(10000);
02401       subs_[i].disconnect();
02402     }
02403   //  subs_.clear();
02404 
02405 }
02406 
02407 void FUEventProcessor::microState(xgi::Input *in,xgi::Output *out)
02408 {
02409   std::string urn = getApplicationDescriptor()->getURN();
02410   try{
02411     evtProcessor_.stateNameFromIndex(0);
02412     evtProcessor_.moduleNameFromIndex(0);
02413   if(myProcess_) {std::cout << "microstate called for child! bail out" << std::endl; return;}
02414   *out << "<tr><td>" << fsm_.stateName()->toString() 
02415        << "</td><td>"<< (myProcess_ ? "S" : "M") <<"</td><td>" << nblive_ << "</td><td>"
02416        << nbdead_ << "</td><td><a href=\"/" << urn << "/procStat\">" << getpid() <<"</a></td>";
02417   evtProcessor_.microState(in,out);
02418   *out << "<td></td><td>" << nbTotalDQM_ 
02419        << "</td><td>" << evtProcessor_.getScalersUpdates() << "</td></tr>";
02420   if(nbSubProcesses_.value_!=0 && !myProcess_) 
02421     {
02422       pthread_mutex_lock(&start_lock_);
02423       for(unsigned int i = 0; i < subs_.size(); i++)
02424         {
02425           try{
02426             if(subs_[i].alive()>0)
02427               {
02428                 *out << "<tr><td  bgcolor=\"#00FF00\" id=\"a"
02429                      << i << "\">""Alive</td><td>S</td><td>"
02430                      << subs_[i].queueId() << "<td>" 
02431                      << subs_[i].queueStatus()<< "/"
02432                      << subs_[i].queueOccupancy() << "/"
02433                      << subs_[i].queuePidOfLastSend() << "/"
02434                      << subs_[i].queuePidOfLastReceive() 
02435                      << "</td><td><a id=\"p"<< i << "\" href=\"SubWeb?process=" 
02436                      << subs_[i].pid() << "&method=procStat\">" 
02437                      << subs_[i].pid()<<"</a></td>" //<< msg->mtext;
02438                      << "<td>" << evtProcessor_.stateNameFromIndex(subs_[i].params().Ms) << "</td><td>" 
02439                      << evtProcessor_.moduleNameFromIndex(subs_[i].params().ms) << "</td><td>" 
02440                      << subs_[i].params().nba << "/" << subs_[i].params().nbp 
02441                      << " (" << float(subs_[i].params().nba)/float(subs_[i].params().nbp)*100. <<"%)" 
02442                      << "</td><td>" << subs_[i].params().ls  << "/" << subs_[i].params().ls 
02443                      << "</td><td>" << subs_[i].params().ps 
02444                      << "</td><td" 
02445                      << ((subs_[i].params().eols<subs_[i].params().ls) ? " bgcolor=\"#00FF00\"" : " bgcolor=\"#FF0000\"")  << ">" 
02446                      << subs_[i].params().eols  
02447                      << "</td><td>" << subs_[i].params().dqm 
02448                      << "</td><td>" << subs_[i].params().trp << "</td>";
02449               }
02450             else 
02451               {
02452                 pthread_mutex_lock(&pickup_lock_);
02453                 *out << "<tr><td id=\"a"<< i << "\" ";
02454                 if(subs_[i].alive()==-1000)
02455                   *out << " bgcolor=\"#bbaabb\">NotInitialized";
02456                 else
02457                   *out << (subs_[i].alive()==0 ? ">Done" : " bgcolor=\"#FF0000\">Dead");
02458                 *out << "</td><td>S</td><td>"<< subs_[i].queueId() << "<td>" 
02459                      << subs_[i].queueStatus() << "/"
02460                      << subs_[i].queueOccupancy() << "/"
02461                      << subs_[i].queuePidOfLastSend() << "/"
02462                      << subs_[i].queuePidOfLastReceive() 
02463                      << "</td><td id=\"p"<< i << "\">"
02464                      <<subs_[i].pid()<<"</td><td colspan=\"5\">" << subs_[i].reasonForFailed();
02465                 if(subs_[i].alive()!=0 && subs_[i].alive()!=-1000) 
02466                   {
02467                     if(autoRestartSlaves_ && subs_[i].restartCount()<=2) 
02468                       *out << " will restart in " << subs_[i].countdown() << " s";
02469                     else if(autoRestartSlaves_)
02470                       *out << " reached maximum restart count";
02471                     else *out << " autoRestart is disabled ";
02472                   }
02473                 *out << "</td><td" 
02474                      << ((subs_[i].params().eols<subs_[i].params().ls) ? 
02475                          " bgcolor=\"#00FF00\"" : " bgcolor=\"#FF0000\"")  
02476                      << ">" 
02477                      << subs_[i].params().eols  
02478                      << "</td><td>" << subs_[i].params().dqm 
02479                      << "</td><td>" << subs_[i].params().trp << "</td>";
02480                 pthread_mutex_unlock(&pickup_lock_);
02481               }
02482             *out << "</tr>";
02483           }
02484           catch(evf::Exception &e){
02485             *out << "<tr><td id=\"a"<< i << "\" " 
02486                  <<"bgcolor=\"#FFFF00\">NotResp</td><td>S</td><td>"<< subs_[i].queueId() << "<td>" 
02487                  << subs_[i].queueStatus() << "/"
02488                  << subs_[i].queueOccupancy() << "/"
02489                  << subs_[i].queuePidOfLastSend() << "/"
02490                  << subs_[i].queuePidOfLastReceive() 
02491                  << "</td><td id=\"p"<< i << "\">"
02492                  <<subs_[i].pid()<<"</td>";
02493           }
02494         }
02495       pthread_mutex_unlock(&start_lock_); 
02496     }
02497   }
02498       catch(evf::Exception &e)
02499       {
02500         LOG4CPLUS_INFO(getApplicationLogger(),"evf::Exception caught in microstate - " << e.what());    
02501       }
02502     catch(cms::Exception &e)
02503       {
02504         LOG4CPLUS_INFO(getApplicationLogger(),"cms::Exception caught in microstate - " << e.what());    
02505       }
02506     catch(std::exception &e)
02507       {
02508         LOG4CPLUS_INFO(getApplicationLogger(),"std::Exception caught in microstate - " << e.what());    
02509       }
02510     catch(...)
02511       {
02512         LOG4CPLUS_INFO(getApplicationLogger(),"unknown exception caught in microstate - ");    
02513       }
02514 
02515 }
02516 
02517 
02518 void FUEventProcessor::updater(xgi::Input *in,xgi::Output *out)
02519 {
02520   using namespace utils;
02521 
02522   *out << updaterStatic_;
02523   mDiv(out,"loads");
02524   uptime(out);
02525   cDiv(out);
02526   mDiv(out,"st",fsm_.stateName()->toString());
02527   mDiv(out,"ru",runNumber_.toString());
02528   mDiv(out,"nsl",nbSubProcesses_.value_);
02529   mDiv(out,"nsr",nbSubProcessesReporting_.value_);
02530   mDiv(out,"cl");
02531   *out << getApplicationDescriptor()->getClassName() 
02532        << (nbSubProcesses_.value_ > 0 ? "MP " : " ");
02533   cDiv(out);
02534   mDiv(out,"in",getApplicationDescriptor()->getInstance());
02535   if(fsm_.stateName()->toString() != "Halted" && fsm_.stateName()->toString() != "halting"){
02536     mDiv(out,"hlt");
02537     *out << "<a href=\"" << configString_.toString() << "\">HLT Config</a>";
02538     cDiv(out);
02539     *out << std::endl;
02540   }
02541   else
02542     mDiv(out,"hlt","Not yet...");
02543 
02544   mDiv(out,"sq",squidPresent_.toString());
02545   mDiv(out,"vwl",(supervising_ ? "Active" : "Not Initialized"));
02546   mDiv(out,"mwl",evtProcessor_.wlMonitoring());
02547   if(nbProcessed != 0 && nbAccepted != 0)
02548     {
02549       mDiv(out,"tt",((xdata::UnsignedInteger32*)nbProcessed)->value_);
02550       mDiv(out,"ac",((xdata::UnsignedInteger32*)nbAccepted)->value_);
02551     }
02552   else
02553     {
02554       mDiv(out,"tt",0);
02555       mDiv(out,"ac",0);
02556     }
02557   if(!myProcess_)
02558     mDiv(out,"swl",(wlSummarizeActive_ ? "Active" : "Inactive"));
02559   else
02560     mDiv(out,"swl",(wlScalersActive_ ? "Active" : "Inactive"));
02561 
02562   mDiv(out,"idi",iDieUrl_.value_);
02563   if(vp_!=0){
02564     mDiv(out,"vpi",(unsigned int) vp_);
02565     if(vulture_->hasStarted()>=0)
02566       mDiv(out,"vul","Prowling");
02567     else
02568       mDiv(out,"vul","Dead");
02569   }
02570   else{
02571     mDiv(out,"vul",(vulture_==0 ? "Nope" : "Hatching"));
02572   }    
02573   if(evtProcessor_){
02574     mDiv(out,"ll");
02575     *out << evtProcessor_.lastLumi().ls
02576          << "," << evtProcessor_.lastLumi().proc << "," << evtProcessor_.lastLumi().acc;
02577     cDiv(out);
02578   }
02579   mDiv(out,"lg");
02580   for(unsigned int i = logRingIndex_; i<logRingSize_; i++)
02581     *out << logRing_[i] << std::endl;
02582   if(logWrap_)
02583     for(unsigned int i = 0; i<logRingIndex_; i++)
02584       *out << logRing_[i] << std::endl;
02585   cDiv(out);
02586   mDiv(out,"cha");
02587   if(cpustat_) *out << cpustat_->getChart();
02588   cDiv(out);
02589 }
02590 
02591 void FUEventProcessor::procStat(xgi::Input *in, xgi::Output *out)
02592 {
02593   evf::utils::procStat(out);
02594 }
02595 
02596 void FUEventProcessor::sendMessageOverMonitorQueue(MsgBuf &buf)
02597 {
02598   if(myProcess_) myProcess_->postSlave(buf,true);
02599 }
02600 
02601 void FUEventProcessor::makeStaticInfo()
02602 {
02603   using namespace utils;
02604   std::ostringstream ost;
02605   mDiv(&ost,"ve");
02606   ost<< "$Revision: 1.151 $ (" << edm::getReleaseVersion() <<")";
02607   cDiv(&ost);
02608   mDiv(&ost,"ou",outPut_.toString());
02609   mDiv(&ost,"sh",hasShMem_.toString());
02610   mDiv(&ost,"mw",hasModuleWebRegistry_.toString());
02611   mDiv(&ost,"sw",hasServiceWebRegistry_.toString());
02612   
02613   xdata::Serializable *monsleep = 0;
02614   xdata::Serializable *lstimeout = 0;
02615   try{
02616     monsleep  = applicationInfoSpace_->find("monSleepSec");
02617     lstimeout = applicationInfoSpace_->find("lsTimeOut");
02618   }
02619   catch(xdata::exception::Exception e){
02620   }
02621   
02622   if(monsleep!=0)
02623     mDiv(&ost,"ms",monsleep->toString());
02624   if(lstimeout!=0)
02625     mDiv(&ost,"lst",lstimeout->toString());
02626   char cbuf[sizeof(struct utsname)];
02627   struct utsname* buf = (struct utsname*)cbuf;
02628   uname(buf);
02629   mDiv(&ost,"sysinfo");
02630   ost << buf->sysname << " " << buf->nodename 
02631       << " " << buf->release << " " << buf->version << " " << buf->machine;
02632   cDiv(&ost);
02633   updaterStatic_ = ost.str();
02634 }
02635 
02636 void FUEventProcessor::handleSignalSlave(int sig, siginfo_t* info, void* c)
02637 {
02638   //notify master
02639   sem_post(sigmon_sem_);
02640 
02641   //sleep until master takes action
02642   sleep(2);
02643 
02644   //set up alarm if handler deadlocks on unsafe actions
02645   alarm(5);
02646   
02647   std::cout << "--- Slave EP signal handler caught signal " << sig << " process id is " << info->si_pid <<" ---" << std::endl;
02648   std::cout << "--- Address: " << std::hex << info->si_addr << std::dec << " --- " << std::endl;
02649   std::cout << "--- Stacktrace follows --" << std::endl;
02650   std::ostringstream stacktr;
02651   toolbox::stacktrace(20,stacktr);
02652   std::cout << stacktr.str();
02653   if (!rlimit_coresize_changed_)
02654     std::cout << "--- Dumping core." <<  " --- " << std::endl;
02655   else
02656     std::cout << "--- Core dump count exceeded on this FU. ---"<<std::endl;
02657  
02658   std::string hasdump = "";
02659   if (rlimit_coresize_changed_) hasdump = " (core dump disabled) ";
02660 
02661   LOG4CPLUS_ERROR(getApplicationLogger(),    "--- Slave EP signal handler caught signal " << sig << ". process id is " << getpid() 
02662                                           << " on node " << toolbox::net::getHostName() << " ---" << std::endl
02663                                           << "--- Address: " << std::hex << info->si_addr << std::dec << " --- " << std::endl
02664                                           << "--- Stacktrace follows"<< hasdump << " ---" << std::endl << stacktr.str()
02665                                           );
02666 
02667   //re-raise signal with default handler (will cause core dump if enabled)
02668   raise(sig);
02669 }
02670 
02671 
02672 XDAQ_INSTANTIATOR_IMPL(evf::FUEventProcessor)