CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_5_3_4/src/EventFilter/Processor/src/FUEventProcessor.cc

Go to the documentation of this file.
00001 
00002 //
00003 // FUEventProcessor
00004 // ----------------
00005 //
00007 
00008 #include "FUEventProcessor.h"
00009 #include "procUtils.h"
00010 #include "EventFilter/Utilities/interface/CPUStat.h"
00011 #include "EventFilter/Utilities/interface/RateStat.h"
00012 
00013 #include "EventFilter/Utilities/interface/Exception.h"
00014 
00015 #include "EventFilter/Message2log4cplus/interface/MLlog4cplus.h"
00016 #include "EventFilter/Modules/interface/FUShmDQMOutputService.h"
00017 #include "EventFilter/Utilities/interface/ServiceWebRegistry.h"
00018 #include "EventFilter/Utilities/interface/ServiceWeb.h"
00019 #include "EventFilter/Utilities/interface/ModuleWeb.h"
00020 #include "EventFilter/Utilities/interface/ModuleWebRegistry.h"
00021 #include "EventFilter/Modules/src/FUShmOutputModule.h"
00022 
00023 #include "FWCore/PluginManager/interface/ProblemTracker.h"
00024 #include "FWCore/PluginManager/interface/PresenceFactory.h"
00025 #include "FWCore/Utilities/interface/Presence.h"
00026 #include "FWCore/Utilities/interface/Exception.h"
00027 #include "FWCore/Version/interface/GetReleaseVersion.h"
00028 #include "FWCore/ServiceRegistry/interface/Service.h"
00029 
00030 #include "toolbox/BSem.h" 
00031 #include "toolbox/Runtime.h"
00032 #include "toolbox/stacktrace.h"
00033 #include "toolbox/net/Utils.h"
00034 
00035 #include <boost/tokenizer.hpp>
00036 
00037 #include "xcept/tools.h"
00038 #include "xgi/Method.h"
00039 
00040 #include "cgicc/CgiDefs.h"
00041 #include "cgicc/Cgicc.h"
00042 #include "cgicc/FormEntry.h"
00043 
00044 
00045 #include <sys/wait.h>
00046 #include <sys/utsname.h>
00047 #include <sys/mman.h>
00048 #include <signal.h>
00049 
00050 #include <typeinfo>
00051 #include <stdlib.h>
00052 #include <stdio.h>
00053 #include <errno.h>
00054 
00055 
00056 using namespace evf;
00057 using namespace cgicc;
00058 namespace toolbox {
00059   namespace mem {
00060     extern toolbox::BSem * _s_mutex_ptr_; 
00061   }
00062 }
00063 
00064 //signal handler (global)
00065 namespace evf {
00066   FUEventProcessor * FUInstancePtr_;
00067   int evfep_raised_signal;
00068   void evfep_sighandler(int sig, siginfo_t* info, void* c)
00069   {
00070     evfep_raised_signal=sig;
00071     FUInstancePtr_->handleSignalSlave(sig, info, c);
00072   }
00073   void evfep_alarmhandler(int sig, siginfo_t* info, void* c)
00074   {
00075     if (evfep_raised_signal) {
00076       signal(evfep_raised_signal,SIG_DFL);
00077       raise(evfep_raised_signal);
00078     }
00079   }
00080 }
00081 
00083 // construction/destruction
00085 
00086 //______________________________________________________________________________
00087 FUEventProcessor::FUEventProcessor(xdaq::ApplicationStub *s) 
00088   : xdaq::Application(s)
00089   , fsm_(this)
00090   , log_(getApplicationLogger())
00091   , evtProcessor_(log_, getApplicationDescriptor()->getInstance())
00092   , runNumber_(0)
00093   , epInitialized_(false)
00094   , outPut_(true)
00095   , autoRestartSlaves_(false)
00096   , slaveRestartDelaySecs_(10)
00097   , hasShMem_(true)
00098   , hasPrescaleService_(true)
00099   , hasModuleWebRegistry_(true)
00100   , hasServiceWebRegistry_(true)
00101   , isRunNumberSetter_(true)
00102   , iDieStatisticsGathering_(false)
00103   , outprev_(true)
00104   , exitOnError_(true)
00105   , reasonForFailedState_()
00106   , squidnet_(3128,"https://localhost:8000/RELEASE-NOTES.txt")
00107   , logRing_(logRingSize_)
00108   , logRingIndex_(logRingSize_)
00109   , logWrap_(false)
00110   , nbSubProcesses_(0)
00111   , nbSubProcessesReporting_(0)
00112   , forkInEDM_(true)
00113   , nblive_(0)
00114   , nbdead_(0)
00115   , nbTotalDQM_(0)
00116   , wlReceiving_(0)
00117   , asReceiveMsgAndExecute_(0)
00118   , receiving_(false) 
00119   , wlReceivingMonitor_(0)
00120   , asReceiveMsgAndRead_(0)
00121   , receivingM_(false)
00122   , myProcess_(0)
00123   , wlSupervising_(0)
00124   , asSupervisor_(0)
00125   , supervising_(false)
00126   , wlSignalMonitor_(0)
00127   , asSignalMonitor_(0)
00128   , signalMonitorActive_(false)
00129   , monitorInfoSpace_(0)
00130   , monitorLegendaInfoSpace_(0)
00131   , applicationInfoSpace_(0)
00132   , nbProcessed(0)
00133   , nbAccepted(0)
00134   , scalersInfoSpace_(0)
00135   , scalersLegendaInfoSpace_(0)
00136   , wlScalers_(0)
00137   , asScalers_(0)
00138   , wlScalersActive_(false)
00139   , scalersUpdates_(0)
00140   , wlSummarize_(0)
00141   , asSummarize_(0)
00142   , wlSummarizeActive_(false)
00143   , superSleepSec_(1)
00144   , iDieUrl_("none")
00145   , vulture_(0)
00146   , vp_(0)
00147   , cpustat_(0)
00148   , ratestat_(0)
00149   , mwrRef_(nullptr)
00150   , sorRef_(nullptr)
00151   , master_message_prg_(0,MSQM_MESSAGE_TYPE_PRG)
00152   , master_message_prr_(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_PRR)
00153   , slave_message_prr_(sizeof(prg),MSQS_MESSAGE_TYPE_PRR)
00154   , master_message_trr_(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_TRR)
00155   , edm_init_done_(true)
00156   , crashesThisRun_(false)
00157   , rlimit_coresize_changed_(false)
00158   , crashesToDump_(2)
00159   , sigmon_sem_(0)
00160 {
00161   using namespace utils;
00162 
00163   FUInstancePtr_=this;
00164 
00165   names_.push_back("nbProcessed"    );
00166   names_.push_back("nbAccepted"     );
00167   names_.push_back("epMacroStateInt");
00168   names_.push_back("epMicroStateInt");
00169   // create pipe for web communication
00170   int retpipe = pipe(anonymousPipe_);
00171   if(retpipe != 0)
00172         LOG4CPLUS_ERROR(getApplicationLogger(),"Failed to create pipe");
00173   // check squid presence
00174   squidPresent_ = squidnet_.check();
00175   //pass application parameters to FWEPWrapper
00176   evtProcessor_.setAppDesc(getApplicationDescriptor());
00177   evtProcessor_.setAppCtxt(getApplicationContext());
00178   // bind relevant callbacks to finite state machine
00179   fsm_.initialize<evf::FUEventProcessor>(this);
00180   
00181   //set sourceId_
00182   url_ =
00183     getApplicationDescriptor()->getContextDescriptor()->getURL()+"/"+
00184     getApplicationDescriptor()->getURN();
00185   class_   =getApplicationDescriptor()->getClassName();
00186   instance_=getApplicationDescriptor()->getInstance();
00187   sourceId_=class_.toString()+"_"+instance_.toString();
00188   LOG4CPLUS_INFO(getApplicationLogger(),sourceId_ <<" constructor"         );
00189   LOG4CPLUS_INFO(getApplicationLogger(),"CMSSW_BASE:"<<getenv("CMSSW_BASE"));
00190   
00191   getApplicationDescriptor()->setAttribute("icon", "/evf/images/epicon.jpg");
00192   
00193   xdata::InfoSpace *ispace = getApplicationInfoSpace();
00194   applicationInfoSpace_ = ispace;
00195 
00196   // default configuration
00197   ispace->fireItemAvailable("parameterSet",         &configString_                );
00198   ispace->fireItemAvailable("epInitialized",        &epInitialized_               );
00199   ispace->fireItemAvailable("stateName",             fsm_.stateName()             );
00200   ispace->fireItemAvailable("runNumber",            &runNumber_                   );
00201   ispace->fireItemAvailable("outputEnabled",        &outPut_                      );
00202 
00203   ispace->fireItemAvailable("hasSharedMemory",      &hasShMem_);
00204   ispace->fireItemAvailable("hasPrescaleService",   &hasPrescaleService_          );
00205   ispace->fireItemAvailable("hasModuleWebRegistry", &hasModuleWebRegistry_        );
00206   ispace->fireItemAvailable("hasServiceWebRegistry", &hasServiceWebRegistry_      );
00207   ispace->fireItemAvailable("isRunNumberSetter",    &isRunNumberSetter_           );
00208   ispace->fireItemAvailable("iDieStatisticsGathering",   &iDieStatisticsGathering_);
00209   ispace->fireItemAvailable("rcmsStateListener",     fsm_.rcmsStateListener()     );
00210   ispace->fireItemAvailable("foundRcmsStateListener",fsm_.foundRcmsStateListener());
00211   ispace->fireItemAvailable("nbSubProcesses",       &nbSubProcesses_              );
00212   ispace->fireItemAvailable("nbSubProcessesReporting",&nbSubProcessesReporting_   );
00213   ispace->fireItemAvailable("forkInEDM"             ,&forkInEDM_                  );
00214   ispace->fireItemAvailable("superSleepSec",        &superSleepSec_               );
00215   ispace->fireItemAvailable("autoRestartSlaves",    &autoRestartSlaves_           );
00216   ispace->fireItemAvailable("slaveRestartDelaySecs",&slaveRestartDelaySecs_       );
00217   ispace->fireItemAvailable("iDieUrl",              &iDieUrl_                     );
00218   ispace->fireItemAvailable("crashesToDump"         ,&crashesToDump_              );
00219 
00220   // Add infospace listeners for exporting data values
00221   getApplicationInfoSpace()->addItemChangedListener("parameterSet",        this);
00222   getApplicationInfoSpace()->addItemChangedListener("outputEnabled",       this);
00223 
00224   // findRcmsStateListener
00225   fsm_.findRcmsStateListener();
00226   
00227   // initialize monitoring infospace
00228 
00229   std::string monInfoSpaceName="evf-eventprocessor-status-monitor";
00230   toolbox::net::URN urn = this->createQualifiedInfoSpace(monInfoSpaceName);
00231   monitorInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
00232 
00233   std::string monLegendaInfoSpaceName="evf-eventprocessor-status-legenda";
00234   urn = this->createQualifiedInfoSpace(monLegendaInfoSpaceName);
00235   monitorLegendaInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
00236 
00237   
00238   monitorInfoSpace_->fireItemAvailable("url",                      &url_            );
00239   monitorInfoSpace_->fireItemAvailable("class",                    &class_          );
00240   monitorInfoSpace_->fireItemAvailable("instance",                 &instance_       );
00241   monitorInfoSpace_->fireItemAvailable("runNumber",                &runNumber_      );
00242   monitorInfoSpace_->fireItemAvailable("stateName",                 fsm_.stateName()); 
00243 
00244   monitorInfoSpace_->fireItemAvailable("squidPresent",             &squidPresent_   );
00245 
00246   std::string scalersInfoSpaceName="evf-eventprocessor-scalers-monitor";
00247   urn = this->createQualifiedInfoSpace(scalersInfoSpaceName);
00248   scalersInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
00249 
00250   std::string scalersLegendaInfoSpaceName="evf-eventprocessor-scalers-legenda";
00251   urn = this->createQualifiedInfoSpace(scalersLegendaInfoSpaceName);
00252   scalersLegendaInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
00253 
00254 
00255 
00256   evtProcessor_.setScalersInfoSpace(scalersInfoSpace_,scalersLegendaInfoSpace_);
00257   scalersInfoSpace_->fireItemAvailable("instance", &instance_);
00258 
00259   evtProcessor_.setApplicationInfoSpace(ispace);
00260   evtProcessor_.setMonitorInfoSpace(monitorInfoSpace_,monitorLegendaInfoSpace_);
00261   evtProcessor_.publishConfigAndMonitorItems(nbSubProcesses_.value_!=0);
00262 
00263   //subprocess state vectors for MP
00264   monitorInfoSpace_->fireItemAvailable("epMacroStateInt",             &spMStates_); 
00265   monitorInfoSpace_->fireItemAvailable("epMicroStateInt",             &spmStates_); 
00266   
00267   // Bind web interface
00268   xgi::bind(this, &FUEventProcessor::css,              "styles.css");
00269   xgi::bind(this, &FUEventProcessor::defaultWebPage,   "Default"   );
00270   xgi::bind(this, &FUEventProcessor::spotlightWebPage, "Spotlight" );
00271   xgi::bind(this, &FUEventProcessor::scalersWeb,       "scalersWeb");
00272   xgi::bind(this, &FUEventProcessor::pathNames,        "pathNames" );
00273   xgi::bind(this, &FUEventProcessor::subWeb,           "SubWeb"    );
00274   xgi::bind(this, &FUEventProcessor::getSlavePids,     "getSlavePids");
00275   xgi::bind(this, &FUEventProcessor::moduleWeb,        "moduleWeb" );
00276   xgi::bind(this, &FUEventProcessor::serviceWeb,       "serviceWeb");
00277   xgi::bind(this, &FUEventProcessor::microState,       "microState");
00278   xgi::bind(this, &FUEventProcessor::updater,          "updater"   );
00279   xgi::bind(this, &FUEventProcessor::procStat,         "procStat"  );
00280 
00281   // instantiate the plugin manager, not referenced here after!
00282 
00283   edm::AssertHandler ah;
00284 
00285   //create Message Service thread and pass ownership to auto_ptr that we destroy before fork
00286   try{
00287     LOG4CPLUS_DEBUG(getApplicationLogger(),
00288                     "Trying to create message service presence ");
00289     edm::PresenceFactory *pf = edm::PresenceFactory::get();
00290     if(pf != 0) {
00291       messageServicePresence_= pf->makePresence("MessageServicePresence");
00292     }
00293     else {
00294       LOG4CPLUS_ERROR(getApplicationLogger(),
00295                       "Unable to create message service presence ");
00296     }
00297   } 
00298   catch(...) {
00299     LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception");
00300   }
00301   ML::MLlog4cplus::setAppl(this);      
00302 
00303   typedef std::set<xdaq::ApplicationDescriptor*> AppDescSet_t;
00304   typedef AppDescSet_t::iterator                 AppDescIter_t;
00305   
00306   AppDescSet_t rcms=
00307     getApplicationContext()->getDefaultZone()->
00308     getApplicationDescriptors("RCMSStateListener");
00309   if(rcms.size()==0) 
00310     {
00311       LOG4CPLUS_WARN(getApplicationLogger(),
00312                        "MonitorReceiver not found, perhaphs it has not been defined ? Scalers updater wl will bail out!");
00313       //        localLog("-W- MonitorReceiver not found, perhaphs it has not been defined ? Scalers updater wl will bail out!");
00314     }
00315   else
00316     {
00317       AppDescIter_t it = rcms.begin();
00318       evtProcessor_.setRcms(*it);
00319     }
00320   pthread_mutex_init(&start_lock_,0);
00321   pthread_mutex_init(&stop_lock_,0);
00322   pthread_mutex_init(&pickup_lock_,0);
00323 
00324   forkInfoObj_=nullptr;
00325   pthread_mutex_init(&forkObjLock_,0);
00326   makeStaticInfo();
00327   startSupervisorLoop();  
00328 
00329   if(vulture_==0) vulture_ = new Vulture(true);
00330 
00332 
00333   AppDescSet_t setOfiDie=
00334     getApplicationContext()->getDefaultZone()->
00335     getApplicationDescriptors("evf::iDie");
00336   
00337   for (AppDescIter_t it=setOfiDie.begin();it!=setOfiDie.end();++it)
00338     if ((*it)->getInstance()==0) // there has to be only one instance of iDie
00339       iDieUrl_ = (*it)->getContextDescriptor()->getURL() + "/" + (*it)->getURN();
00340 
00341   //save default core file size
00342   getrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
00343 
00344   //prepare IPC semaphore for getting the workloop waked up on signal caught in slaves
00345   #ifdef linux
00346   if (sigmon_sem_==0) {
00347     sigmon_sem_ = (sem_t*)mmap(NULL, sizeof(sem_t),
00348         PROT_READ | PROT_WRITE,
00349         MAP_ANONYMOUS | MAP_SHARED, 0, 0);
00350     if (!sigmon_sem_) {
00351       perror("mmap error\n");
00352       std::cout << "mmap error"<<std::endl;
00353     }
00354     else
00355       sem_init(sigmon_sem_,true,0);
00356   }
00357   #endif
00358 }
00359 //___________here ends the *huge* constructor___________________________________
00360 
00361 
00362 //______________________________________________________________________________
00363 FUEventProcessor::~FUEventProcessor()
00364 {
00365   // no longer needed since the wrapper is a member of the class and one can rely on 
00366   // implicit destructor - to be revised - at any rate the most common exit path is via "kill"...
00367   //  if (evtProcessor_) delete evtProcessor_;
00368   if(vulture_ != 0) delete vulture_;
00369 }
00370 
00371 
00373 // implementation of member functions
00375 
00376 
00377 //______________________________________________________________________________
00378 bool FUEventProcessor::configuring(toolbox::task::WorkLoop* wl)
00379 {
00380 //   std::cout << "values " << ((nbSubProcesses_.value_!=0) ? 0x10 : 0) << " "
00381 //          << ((instance_.value_==0) ? 0x8 : 0) << " "
00382 //          << (hasServiceWebRegistry_.value_ ? 0x4 : 0) << " "
00383 //          << (hasModuleWebRegistry_.value_ ? 0x2 : 0) << " "
00384 //          << (hasPrescaleService_.value_ ? 0x1 : 0) <<std::endl;
00385   unsigned short smap 
00386     = ((nbSubProcesses_.value_!=0) ? 0x10 : 0)
00387     + (((instance_.value_%80)==0) ? 0x8 : 0) // have at least one legend per slice
00388     + (hasServiceWebRegistry_.value_ ? 0x4 : 0) 
00389     + (hasModuleWebRegistry_.value_ ? 0x2 : 0) 
00390     + (hasPrescaleService_.value_ ? 0x1 : 0);
00391   if(nbSubProcesses_.value_==0) 
00392     {
00393       spMStates_.setSize(1); 
00394       spmStates_.setSize(1); 
00395     }
00396   else
00397     {
00398       spMStates_.setSize(nbSubProcesses_.value_);
00399       spmStates_.setSize(nbSubProcesses_.value_);
00400       for(unsigned int i = 0; i < spMStates_.size(); i++)
00401         {
00402           spMStates_[i] = edm::event_processor::sInit; 
00403           spmStates_[i] = 0; 
00404         }
00405     }
00406   try {
00407     LOG4CPLUS_INFO(getApplicationLogger(),"Start configuring ...");
00408     std::string cfg = configString_.toString(); evtProcessor_.init(smap,cfg);
00409     epInitialized_=true;
00410     if(evtProcessor_)
00411       {
00412         //get ref of mwr
00413         mwrRef_ = evtProcessor_.getModuleWebRegistry();
00414         sorRef_ = evtProcessor_.getShmOutputModuleRegistry();
00415         // moved to wrapper class
00416         configuration_ = evtProcessor_.configuration();
00417         if(nbSubProcesses_.value_==0) evtProcessor_.startMonitoringWorkLoop(); 
00418         evtProcessor_->beginJob(); 
00419         if(cpustat_) {delete cpustat_; cpustat_=0;}
00420         cpustat_ = new CPUStat(evtProcessor_.getNumberOfMicrostates(),
00421                                nbSubProcesses_.value_,
00422                                instance_.value_,
00423                                iDieUrl_.value_);
00424         if(ratestat_) {delete ratestat_; ratestat_=0;}
00425         ratestat_ = new RateStat(iDieUrl_.value_);
00426         if(iDieStatisticsGathering_.value_){
00427           try{
00428             cpustat_->sendLegenda(evtProcessor_.getmicromap());
00429             xdata::Serializable *legenda = scalersLegendaInfoSpace_->find("scalersLegenda");
00430             if(legenda !=0){
00431               std::string slegenda = ((xdata::String*)legenda)->value_;
00432               ratestat_->sendLegenda(slegenda);
00433             }
00434 
00435           }
00436           catch(evf::Exception &e){
00437             LOG4CPLUS_INFO(getApplicationLogger(),"coud not send legenda"
00438                            << e.what());
00439           }
00440         }
00441         
00442         fsm_.fireEvent("ConfigureDone",this);
00443         LOG4CPLUS_INFO(getApplicationLogger(),"Finished configuring!");
00444         localLog("-I- Configuration completed");
00445 
00446       }
00447   }
00448   catch (xcept::Exception &e) {
00449     reasonForFailedState_ = "configuring FAILED: " + (std::string)e.what();
00450     fsm_.fireFailed(reasonForFailedState_,this);
00451     localLog(reasonForFailedState_);
00452   }
00453   catch(cms::Exception &e) {
00454     reasonForFailedState_ = e.explainSelf();
00455     fsm_.fireFailed(reasonForFailedState_,this);
00456     localLog(reasonForFailedState_);
00457   }    
00458   catch(std::exception &e) {
00459     reasonForFailedState_ = e.what();
00460     fsm_.fireFailed(reasonForFailedState_,this);
00461     localLog(reasonForFailedState_);
00462   }
00463   catch(...) {
00464     fsm_.fireFailed("Unknown Exception",this);
00465   }
00466 
00467   if(vulture_!=0 && vp_ == 0) vp_ = vulture_->makeProcess();
00468 
00469   return false;
00470 }
00471 
00472 
00473 
00474 
00475 //______________________________________________________________________________
00476 bool FUEventProcessor::enabling(toolbox::task::WorkLoop* wl)
00477 {
00478   nbTotalDQM_ = 0;
00479   scalersUpdates_ = 0;
00480   idleProcStats_ = 0;
00481   allProcStats_ = 0;
00482 //   std::cout << "values " << ((nbSubProcesses_.value_!=0) ? 0x10 : 0) << " "
00483 //          << ((instance_.value_==0) ? 0x8 : 0) << " "
00484 //          << (hasServiceWebRegistry_.value_ ? 0x4 : 0) << " "
00485 //          << (hasModuleWebRegistry_.value_ ? 0x2 : 0) << " "
00486 //          << (hasPrescaleService_.value_ ? 0x1 : 0) <<std::endl;
00487   unsigned short smap 
00488     = ((nbSubProcesses_.value_!=0) ? 0x10 : 0)
00489     + (((instance_.value_%80)==0) ? 0x8 : 0) // have at least one legend per slice
00490     + (hasServiceWebRegistry_.value_ ? 0x4 : 0) 
00491     + (hasModuleWebRegistry_.value_ ? 0x2 : 0) 
00492     + (hasPrescaleService_.value_ ? 0x1 : 0);
00493 
00494   LOG4CPLUS_INFO(getApplicationLogger(),"Start enabling...");
00495   
00496   //reset core limit size
00497   if (rlimit_coresize_changed_)
00498     setrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
00499   rlimit_coresize_changed_=false;
00500   crashesThisRun_=0;
00501   //recreate signal monitor sem
00502   sem_destroy(sigmon_sem_);
00503   sem_init(sigmon_sem_,true,0);
00504 
00505   if(!epInitialized_){
00506     evtProcessor_.forceInitEventProcessorMaybe();
00507   }
00508   std::string cfg = configString_.toString(); evtProcessor_.init(smap,cfg);
00509   
00510   mwrRef_ = evtProcessor_.getModuleWebRegistry();
00511   sorRef_ = evtProcessor_.getShmOutputModuleRegistry();
00512 
00513   if(!epInitialized_){
00514     evtProcessor_->beginJob(); 
00515     if(cpustat_) {delete cpustat_; cpustat_=0;}
00516     cpustat_ = new CPUStat(evtProcessor_.getNumberOfMicrostates(),
00517                            nbSubProcesses_.value_,
00518                            instance_.value_,
00519                            iDieUrl_.value_);
00520     if(ratestat_) {delete ratestat_; ratestat_=0;}
00521     ratestat_ = new RateStat(iDieUrl_.value_);
00522     if(iDieStatisticsGathering_.value_){
00523       try
00524         {
00525         cpustat_->sendLegenda(evtProcessor_.getmicromap());
00526         xdata::Serializable *legenda = scalersLegendaInfoSpace_->find("scalersLegenda");
00527         if(legenda !=0)
00528           {
00529             std::string slegenda = ((xdata::String*)legenda)->value_;
00530             ratestat_->sendLegenda(slegenda);
00531           }
00532         }
00533       catch(evf::Exception &e)
00534         {
00535           LOG4CPLUS_INFO(getApplicationLogger(),"could not send legenda"
00536                          << e.what());
00537         }
00538       catch (xcept::Exception& e) {
00539         LOG4CPLUS_ERROR(getApplicationLogger(),"Failed to get or send legenda."
00540                         << e.what());
00541       }
00542     }
00543     epInitialized_ = true;
00544   }
00545   configuration_ = evtProcessor_.configuration(); // get it again after init has been carried out...
00546   evtProcessor_.resetLumiSectionReferenceIndex();
00547   //classic appl will return here 
00548   if(nbSubProcesses_.value_==0) return enableClassic();
00549   //protect manipulation of subprocess array
00550   pthread_mutex_lock(&start_lock_);
00551   pthread_mutex_lock(&pickup_lock_);
00552   subs_.clear();
00553   subs_.resize(nbSubProcesses_.value_); // this should not be necessary
00554   pid_t retval = -1;
00555 
00556   for(unsigned int i=0; i<nbSubProcesses_.value_; i++)
00557     {
00558       subs_[i]=SubProcess(i,retval); //this will replace all the scattered variables
00559     }
00560   pthread_mutex_unlock(&pickup_lock_);
00561 
00562   pthread_mutex_unlock(&start_lock_);
00563 
00564   //set expected number of child EP's for the Init message(s) sent to the SM
00565   try {
00566     if (sorRef_) {
00567       unsigned int nbExpectedEPWriters = nbSubProcesses_.value_;
00568       if (nbExpectedEPWriters==0) nbExpectedEPWriters=1;//master instance processing
00569       std::vector<edm::FUShmOutputModule *> shmOutputs = sorRef_->getShmOutputModules();
00570       for (unsigned int i=0;i<shmOutputs.size();i++) {
00571         shmOutputs[i]->setNExpectedEPs(nbExpectedEPWriters);
00572       }
00573     }
00574   }
00575   catch (...)
00576   {
00577     LOG4CPLUS_ERROR(getApplicationLogger(),"Thrown Exception while setting nExpectedEPs in shmOutputs");
00578   }
00579 
00580   //use new method if configured
00581   edm_init_done_=true;
00582   if (forkInEDM_.value_) {
00583     edm_init_done_=false;
00584     enableForkInEDM();
00585   }
00586   else
00587     for(unsigned int i=0; i<nbSubProcesses_.value_; i++)
00588     {
00589       retval = subs_[i].forkNew();
00590       if(retval==0)
00591         {
00592           myProcess_ = &subs_[i];
00593           // dirty hack: delete/recreate global binary semaphore for later use in child
00594           delete toolbox::mem::_s_mutex_ptr_;
00595           toolbox::mem::_s_mutex_ptr_ = new toolbox::BSem(toolbox::BSem::FULL,true);
00596           int retval = pthread_mutex_destroy(&stop_lock_);
00597           if(retval != 0) perror("error");
00598           retval = pthread_mutex_init(&stop_lock_,0);
00599           if(retval != 0) perror("error");
00600           fsm_.disableRcmsStateNotification();
00601           
00602           return enableMPEPSlave();
00603           // the loop is broken in the child 
00604         }
00605     }
00606 
00607   if (forkInEDM_.value_) {
00608 
00609     edm::event_processor::State st;
00610     while (!edm_init_done_) {
00611       usleep(10000);
00612       st = evtProcessor_->getState();
00613       if (st==edm::event_processor::sError || st==edm::event_processor::sInvalid) break;
00614     }
00615     st = evtProcessor_->getState();
00616     //error handling: EP must fork during sRunning
00617     if (st!=edm::event_processor::sRunning) {
00618       reasonForFailedState_ = std::string("Master edm::EventProcessor in state ") + evtProcessor_->stateName(st);
00619       localLog(reasonForFailedState_);
00620       fsm_.fireFailed(reasonForFailedState_,this);
00621       return false;
00622     }
00623 
00624     sleep(1);
00625     startSummarizeWorkLoop();
00626     startSignalMonitorWorkLoop();//only with new forking
00627     vp_ = vulture_->start(iDieUrl_.value_,runNumber_.value_);
00628 
00629     //enable after we are done with conditions loading and forking
00630     LOG4CPLUS_INFO(getApplicationLogger(),"Finished enabling!");
00631     fsm_.fireEvent("EnableDone",this);
00632     localLog("-I- Start completed");
00633     return false;
00634   }
00635 
00636   startSummarizeWorkLoop();
00637   vp_ = vulture_->start(iDieUrl_.value_,runNumber_.value_);
00638   LOG4CPLUS_INFO(getApplicationLogger(),"Finished enabling!");
00639   fsm_.fireEvent("EnableDone",this);
00640   localLog("-I- Start completed");
00641   return false;
00642 }
00643 
00644 bool FUEventProcessor::doEndRunInEDM() {
00645   //taking care that EP in master is in stoppable state
00646   if (forkInfoObj_) {
00647 
00648     int count = 30;
00649     bool waitedForEDM=false;
00650     while (!edm_init_done_ && count) {
00651       ::sleep(1);
00652       if (count%5==0)
00653         LOG4CPLUS_WARN(log_,"MASTER EP: Stopping while EP busy in beginRun. waiting " <<count<< "sec");
00654       count--;
00655       waitedForEDM=true;
00656     }
00657     //sleep a few more seconds it was early stop
00658     if (waitedForEDM) sleep(5);
00659 
00660     //if (count==0) fsm_.fireFailed("failed to stop Master EP",this);
00661 
00662     if (evtProcessor_->getState()==edm::event_processor::sJobReady)
00663       return true;//we are already done
00664 
00665     forkInfoObj_->lock();
00666     forkInfoObj_->stopCondition=true;
00667     sem_post(forkInfoObj_->control_sem_);
00668     forkInfoObj_->unlock();
00669 
00670     count = 30;
00671 
00672     edm::event_processor::State st;
00673     while(count--) {
00674       st = evtProcessor_->getState();
00675       if (st==edm::event_processor::sRunning) ::usleep(100000);
00676       else if (st==edm::event_processor::sStopping || st==edm::event_processor::sJobReady) {
00677         break;
00678       }
00679       else {
00680         std::ostringstream ost;
00681         ost << "Master edm::EventProcessor is in state "<< evtProcessor_->stateName(st) << " while stopping";
00682         LOG4CPLUS_ERROR(getApplicationLogger(),ost.str());
00683         fsm_.fireFailed(ost.str(),this);
00684         return false;
00685       }
00686       if (count%5==0 && st==edm::event_processor::sRunning && !forkInfoObj_->receivedStop_) {
00687         forkInfoObj_->lock();
00688         forkInfoObj_->stopCondition=true;
00689         sem_post(forkInfoObj_->control_sem_);
00690         forkInfoObj_->unlock();
00691         LOG4CPLUS_WARN(getApplicationLogger(),
00692           "Master edm::EventProcessor still running after "<< (30-count-1) << " seconds. \"sem_post\" was executed again" );
00693       }
00694     }
00695     if (count<0) {
00696       std::ostringstream ost;
00697       if (!forkInfoObj_->receivedStop_)
00698         ost << "Timeout waiting for Master edm::EventProcessor to go stopping state "
00699             << evtProcessor_->stateName(st) << ": input source did not receive stop signal!";
00700       else
00701         ost << "Timeout waiting for Master edm::EventProcessor to go stopping state "<<evtProcessor_->stateName(st);
00702       LOG4CPLUS_ERROR(getApplicationLogger(),ost.str());
00703       fsm_.fireFailed(ost.str(),this);
00704       return false;
00705     }
00706   }
00707   return true;
00708 }
00709 
00710 //______________________________________________________________________________
00711 bool FUEventProcessor::stopping(toolbox::task::WorkLoop* wl)
00712 {
00713   setrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
00714   if(nbSubProcesses_.value_!=0) {
00715     stopSlavesAndAcknowledge();
00716     if (forkInEDM_.value_) {
00717             //only in new forking for now
00718             sem_post(sigmon_sem_);
00719             doEndRunInEDM();
00720     }
00721   }
00722   vulture_->stop();
00723 
00724   if (forkInEDM_.value_) {
00725     //shared memory was already disconnected in master
00726     bool tmpHasShMem_=hasShMem_;
00727     hasShMem_=false;
00728     stopClassic();
00729     hasShMem_=tmpHasShMem_;
00730     return false;
00731   }
00732   stopClassic();
00733   return false;
00734 }
00735 
00736 
00737 //______________________________________________________________________________
00738 bool FUEventProcessor::halting(toolbox::task::WorkLoop* wl)
00739 {
00740   LOG4CPLUS_INFO(getApplicationLogger(),"Start halting ...");
00741   setrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
00742   if(nbSubProcesses_.value_!=0) { 
00743     stopSlavesAndAcknowledge();
00744     if (forkInEDM_.value_) {
00745             sem_post(sigmon_sem_);
00746             doEndRunInEDM();
00747     }
00748   }
00749   try{
00750     evtProcessor_.stopAndHalt();
00751     //cleanup forking variables
00752     if (forkInfoObj_) {
00753       delete forkInfoObj_;
00754       forkInfoObj_=0;
00755     }
00756   }
00757   catch (evf::Exception &e) {
00758     reasonForFailedState_ = "halting FAILED: " + (std::string)e.what();
00759     localLog(reasonForFailedState_);
00760     fsm_.fireFailed(reasonForFailedState_,this);
00761   }
00762   //  if(hasShMem_) detachDqmFromShm();
00763 
00764   LOG4CPLUS_INFO(getApplicationLogger(),"Finished halting!");
00765   fsm_.fireEvent("HaltDone",this);
00766 
00767   localLog("-I- Halt completed");
00768   return false;
00769 }
00770 
00771 
00772 //______________________________________________________________________________
00773 xoap::MessageReference FUEventProcessor::fsmCallback(xoap::MessageReference msg)
00774   throw (xoap::exception::Exception)
00775 {
00776   return fsm_.commandCallback(msg);
00777 }
00778 
00779 
00780 //______________________________________________________________________________
00781 void FUEventProcessor::actionPerformed(xdata::Event& e)
00782 {
00783 
00784   if (e.type()=="ItemChangedEvent" && fsm_.stateName()->toString()!="Halted") {
00785     std::string item = dynamic_cast<xdata::ItemChangedEvent&>(e).itemName();
00786     
00787     if ( item == "parameterSet") {
00788       LOG4CPLUS_WARN(getApplicationLogger(),
00789                      "HLT Menu changed, force reinitialization of EventProcessor");
00790       epInitialized_ = false;
00791     }
00792     if ( item == "outputEnabled") {
00793       if(outprev_ != outPut_) {
00794         LOG4CPLUS_WARN(getApplicationLogger(),
00795                        (outprev_ ? "Disabling " : "Enabling ")<<"global output");
00796         evtProcessor_->enableEndPaths(outPut_);
00797         outprev_ = outPut_;
00798       }
00799     }
00800     if (item == "globalInputPrescale") {
00801       LOG4CPLUS_WARN(getApplicationLogger(),
00802                      "Setting global input prescale has no effect "
00803                      <<"in this version of the code");
00804     }
00805     if ( item == "globalOutputPrescale") {
00806       LOG4CPLUS_WARN(getApplicationLogger(),
00807                      "Setting global output prescale has no effect "
00808                      <<"in this version of the code");
00809     }
00810   }
00811   
00812 }
00813 
00814 //______________________________________________________________________________
00815 void FUEventProcessor::getSlavePids(xgi::Input  *in, xgi::Output *out)
00816 {
00817   for (unsigned int i=0;i<subs_.size();i++)
00818   {
00819     if (i!=0) *out << ",";
00820     *out << subs_[i].pid();
00821   }
00822 }
00823 //______________________________________________________________________________
00824 void FUEventProcessor::subWeb(xgi::Input  *in, xgi::Output *out)
00825 {
00826   using namespace cgicc;
00827   pid_t pid = 0;
00828   std::ostringstream ost;
00829   ost << "&";
00830 
00831   Cgicc cgi(in);
00832   internal::MyCgi *mycgi = (internal::MyCgi*)in;
00833   for(std::map<std::string, std::string, std::less<std::string> >::iterator mit = 
00834         mycgi->getEnvironment().begin();
00835       mit != mycgi->getEnvironment().end(); mit++)
00836     ost << mit->first << "%" << mit->second << ";";
00837   std::vector<FormEntry> els = cgi.getElements() ;
00838   std::vector<FormEntry> el1;
00839   cgi.getElement("method",el1);
00840   std::vector<FormEntry> el2;
00841   cgi.getElement("process",el2);
00842   if(el1.size()!=0) {
00843     std::string meth = el1[0].getValue();
00844     if(el2.size()!=0) {
00845       unsigned int i = 0;
00846       std::string mod = el2[0].getValue();
00847       pid = atoi(mod.c_str()); // get the process id to be polled
00848       for(; i < subs_.size(); i++)
00849         if(subs_[i].pid()==pid) break;
00850       if(i>=subs_.size()){ //process was not found, let the browser know
00851         *out << "ERROR 404 : Process " << pid << " Not Found !" << std::endl;
00852         return;
00853       } 
00854       if(subs_[i].alive() != 1){
00855         *out << "ERROR 405 : Process " << pid << " Not Alive !" << std::endl;
00856         return;
00857       }
00858       MsgBuf msg1(meth.length()+ost.str().length()+1,MSQM_MESSAGE_TYPE_WEB);
00859       strncpy(msg1->mtext,meth.c_str(),meth.length());
00860       strncpy(msg1->mtext+meth.length(),ost.str().c_str(),ost.str().length());
00861       subs_[i].post(msg1,true);
00862       unsigned int keep_supersleep_original_value = superSleepSec_.value_;
00863       superSleepSec_.value_=10*keep_supersleep_original_value;
00864       MsgBuf msg(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_WEB);
00865       bool done = false;
00866       std::vector<char *>pieces;
00867       while(!done){
00868         unsigned long retval1 = subs_[i].rcvNonBlocking(msg,true);
00869         if(retval1 == MSGQ_MESSAGE_TYPE_RANGE){
00870 	  ::sleep(1);
00871           continue;
00872         }
00873         unsigned int nbytes = atoi(msg->mtext);
00874         if(nbytes < MAX_PIPE_BUFFER_SIZE) done = true; // this will break the while loop
00875         char *buf= new char[nbytes];
00876         ssize_t retval = read(anonymousPipe_[PIPE_READ],buf,nbytes);
00877         if(retval!=nbytes) std::cout 
00878           << "CAREFUL HERE, read less bytes than expected from pipe in subWeb" << std::endl;
00879         pieces.push_back(buf);
00880       }
00881       superSleepSec_.value_=keep_supersleep_original_value;
00882       for(unsigned int j = 0; j < pieces.size(); j++){
00883         *out<<pieces[j];    // chain the buffers into the output strstream
00884         delete[] pieces[j]; //make sure to release all buffers used for reading the pipe
00885       }
00886     }
00887   }
00888 }
00889 
00890 
00891 //______________________________________________________________________________
00892 void FUEventProcessor::defaultWebPage(xgi::Input  *in, xgi::Output *out)
00893   throw (xgi::exception::Exception)
00894 {
00895 
00896 
00897   *out << "<!DOCTYPE HTML PUBLIC \"-//W3C//DTD HTML 4.0 Transitional//EN\">" 
00898        << "<html><head><title>" << getApplicationDescriptor()->getClassName() << (nbSubProcesses_.value_ > 0 ? "MP " : " ") 
00899        << getApplicationDescriptor()->getInstance() << "</title>"
00900        << "<meta https-equiv=\"REFRESH\" content=\"0;url=/evf/html/defaultBasePage.html\">"
00901        << "</head></html>";
00902 }
00903 
00904 
00905 //______________________________________________________________________________
00906 
00907 
00908 void FUEventProcessor::spotlightWebPage(xgi::Input  *in, xgi::Output *out)
00909   throw (xgi::exception::Exception)
00910 {
00911 
00912   std::string urn = getApplicationDescriptor()->getURN();
00913 
00914   *out << "<!-- base href=\"/" <<  urn
00915        << "\"> -->" << std::endl;
00916   *out << "<html>"                                                   << std::endl;
00917   *out << "<head>"                                                   << std::endl;
00918   *out << "<link type=\"text/css\" rel=\"stylesheet\"";
00919   *out << " href=\"/evf/html/styles.css\"/>"                   << std::endl;
00920   *out << "<title>" << getApplicationDescriptor()->getClassName() 
00921        << getApplicationDescriptor()->getInstance() 
00922        << " MAIN</title>"     << std::endl;
00923   *out << "</head>"                                                  << std::endl;
00924   *out << "<body>"                                                   << std::endl;
00925   *out << "<table border=\"0\" width=\"100%\">"                      << std::endl;
00926   *out << "<tr>"                                                     << std::endl;
00927   *out << "  <td align=\"left\">"                                    << std::endl;
00928   *out << "    <img"                                                 << std::endl;
00929   *out << "     align=\"middle\""                                    << std::endl;
00930   *out << "     src=\"/evf/images/spoticon.jpg\""                            << std::endl;
00931   *out << "     alt=\"main\""                                        << std::endl;
00932   *out << "     width=\"64\""                                        << std::endl;
00933   *out << "     height=\"64\""                                       << std::endl;
00934   *out << "     border=\"\"/>"                                       << std::endl;
00935   *out << "    <b>"                                                  << std::endl;
00936   *out << getApplicationDescriptor()->getClassName() 
00937        << getApplicationDescriptor()->getInstance()                  << std::endl;
00938   *out << "      " << fsm_.stateName()->toString()                   << std::endl;
00939   *out << "    </b>"                                                 << std::endl;
00940   *out << "  </td>"                                                  << std::endl;
00941   *out << "  <td width=\"32\">"                                      << std::endl;
00942   *out << "    <a href=\"/urn:xdaq-application:lid=3\">"             << std::endl;
00943   *out << "      <img"                                               << std::endl;
00944   *out << "       align=\"middle\""                                  << std::endl;
00945   *out << "       src=\"/hyperdaq/images/HyperDAQ.jpg\""             << std::endl;
00946   *out << "       alt=\"HyperDAQ\""                                  << std::endl;
00947   *out << "       width=\"32\""                                      << std::endl;
00948   *out << "       height=\"32\""                                     << std::endl;
00949   *out << "       border=\"\"/>"                                     << std::endl;
00950   *out << "    </a>"                                                 << std::endl;
00951   *out << "  </td>"                                                  << std::endl;
00952   *out << "  <td width=\"32\">"                                      << std::endl;
00953   *out << "  </td>"                                                  << std::endl;
00954   *out << "  <td width=\"32\">"                                      << std::endl;
00955   *out << "    <a href=\"/" << urn << "/\">"                         << std::endl;
00956   *out << "      <img"                                               << std::endl;
00957   *out << "       align=\"middle\""                                  << std::endl;
00958   *out << "       src=\"/evf/images/epicon.jpg\""                    << std::endl;
00959   *out << "       alt=\"main\""                                      << std::endl;
00960   *out << "       width=\"32\""                                      << std::endl;
00961   *out << "       height=\"32\""                                     << std::endl;
00962   *out << "       border=\"\"/>"                                     << std::endl;
00963   *out << "    </a>"                                                 << std::endl;
00964   *out << "  </td>"                                                  << std::endl;
00965   *out << "</tr>"                                                    << std::endl;
00966   *out << "</table>"                                                 << std::endl;
00967 
00968   *out << "<hr/>"                                                    << std::endl;
00969   
00970   std::ostringstream ost;
00971   if(myProcess_) 
00972     ost << "/SubWeb?process=" << getpid() << "&method=moduleWeb&";
00973   else
00974     ost << "/moduleWeb?";
00975   urn += ost.str();
00976   if(evtProcessor_ && (myProcess_ || nbSubProcesses_.value_==0))
00977     evtProcessor_.taskWebPage(in,out,urn);
00978   else if(evtProcessor_)
00979     evtProcessor_.summaryWebPage(in,out,urn);
00980   else
00981     *out << "<td>HLT Unconfigured</td>" << std::endl;
00982   *out << "</table>"                                                 << std::endl;
00983   
00984   *out << "<br><textarea rows=" << 10 << " cols=80 scroll=yes>"      << std::endl;
00985   *out << configuration_                                             << std::endl;
00986   *out << "</textarea><P>"                                           << std::endl;
00987   
00988   *out << "</body>"                                                  << std::endl;
00989   *out << "</html>"                                                  << std::endl;
00990 
00991 
00992 }
00993 void FUEventProcessor::scalersWeb(xgi::Input  *in, xgi::Output *out)
00994   throw (xgi::exception::Exception)
00995 {
00996 
00997   out->getHTTPResponseHeader().addHeader( "Content-Type",
00998                                           "application/octet-stream" );
00999   out->getHTTPResponseHeader().addHeader( "Content-Transfer-Encoding",
01000                                           "binary" );
01001   if(evtProcessor_ != 0){
01002     out->write( (char*)(evtProcessor_.getPackedTriggerReportAsStruct()), sizeof(TriggerReportStatic) );
01003   }
01004 }
01005 
01006 void FUEventProcessor::pathNames(xgi::Input  *in, xgi::Output *out)
01007   throw (xgi::exception::Exception)
01008 {
01009 
01010   if(evtProcessor_ != 0){
01011     xdata::Serializable *legenda = scalersLegendaInfoSpace_->find("scalersLegenda");
01012     if(legenda !=0){
01013       std::string slegenda = ((xdata::String*)legenda)->value_;
01014       *out << slegenda << std::endl;
01015     }
01016   }
01017 }
01018 
01019 
01020 void FUEventProcessor::setAttachDqmToShm() throw (evf::Exception)  
01021 {
01022   std::string errmsg;
01023   try {
01024     edm::ServiceRegistry::Operate operate(evtProcessor_->getToken());
01025     if(edm::Service<FUShmDQMOutputService>().isAvailable())
01026       edm::Service<FUShmDQMOutputService>()->setAttachToShm();
01027   }
01028   catch (cms::Exception& e) {
01029     errmsg = "Failed to set to attach DQM service to shared memory: " + (std::string)e.what();
01030   }
01031   if (!errmsg.empty()) XCEPT_RAISE(evf::Exception,errmsg);
01032 }
01033 
01034 
01035 void FUEventProcessor::attachDqmToShm() throw (evf::Exception)  
01036 {
01037   std::string errmsg;
01038   bool success = false;
01039   try {
01040     edm::ServiceRegistry::Operate operate(evtProcessor_->getToken());
01041     if(edm::Service<FUShmDQMOutputService>().isAvailable())
01042       success = edm::Service<FUShmDQMOutputService>()->attachToShm();
01043     if (!success) errmsg = "Failed to attach DQM service to shared memory";
01044   }
01045   catch (cms::Exception& e) {
01046     errmsg = "Failed to attach DQM service to shared memory: " + (std::string)e.what();
01047   }
01048   if (!errmsg.empty()) XCEPT_RAISE(evf::Exception,errmsg);
01049 }
01050 
01051 
01052 
01053 void FUEventProcessor::detachDqmFromShm() throw (evf::Exception)
01054 {
01055   std::string errmsg;
01056   bool success = false;
01057   try {
01058     edm::ServiceRegistry::Operate operate(evtProcessor_->getToken());
01059     if(edm::Service<FUShmDQMOutputService>().isAvailable())
01060       success = edm::Service<FUShmDQMOutputService>()->detachFromShm();
01061     if (!success) errmsg = "Failed to detach DQM service from shared memory";
01062   }
01063   catch (cms::Exception& e) {
01064     errmsg = "Failed to detach DQM service from shared memory: " + (std::string)e.what();
01065   }
01066   if (!errmsg.empty()) XCEPT_RAISE(evf::Exception,errmsg);
01067 }
01068 
01069 
01070 std::string FUEventProcessor::logsAsString()
01071 {
01072   std::ostringstream oss;
01073   if(logWrap_)
01074     {
01075       for(unsigned int i = logRingIndex_; i < logRing_.size(); i++)
01076         oss << logRing_[i] << std::endl;
01077       for(unsigned int i = 0; i <  logRingIndex_; i++)
01078         oss << logRing_[i] << std::endl;
01079     }
01080   else
01081       for(unsigned int i = logRingIndex_; i < logRing_.size(); i++)
01082         oss << logRing_[i] << std::endl;
01083     
01084   return oss.str();
01085 }
01086   
01087 void FUEventProcessor::localLog(std::string m)
01088 {
01089   timeval tv;
01090 
01091   gettimeofday(&tv,0);
01092   tm *uptm = localtime(&tv.tv_sec);
01093   char datestring[256];
01094   strftime(datestring, sizeof(datestring),"%c", uptm);
01095 
01096   if(logRingIndex_ == 0){logWrap_ = true; logRingIndex_ = logRingSize_;}
01097   logRingIndex_--;
01098   std::ostringstream timestamp;
01099   timestamp << " at " << datestring;
01100   m += timestamp.str();
01101   logRing_[logRingIndex_] = m;
01102 }
01103 
01104 void FUEventProcessor::startSupervisorLoop()
01105 {
01106   try {
01107     wlSupervising_=
01108       toolbox::task::getWorkLoopFactory()->getWorkLoop("Supervisor",
01109                                                        "waiting");
01110     if (!wlSupervising_->isActive()) wlSupervising_->activate();
01111     asSupervisor_ = toolbox::task::bind(this,&FUEventProcessor::supervisor,
01112                                         "Supervisor");
01113     wlSupervising_->submit(asSupervisor_);
01114     supervising_ = true;
01115   }
01116   catch (xcept::Exception& e) {
01117     std::string msg = "Failed to start workloop 'Supervisor'.";
01118     XCEPT_RETHROW(evf::Exception,msg,e);
01119   }
01120 }
01121 
01122 void FUEventProcessor::startReceivingLoop()
01123 {
01124   try {
01125     wlReceiving_=
01126       toolbox::task::getWorkLoopFactory()->getWorkLoop("Receiving",
01127                                                        "waiting");
01128     if (!wlReceiving_->isActive()) wlReceiving_->activate();
01129     asReceiveMsgAndExecute_ = toolbox::task::bind(this,&FUEventProcessor::receiving,
01130                                         "Receiving");
01131     wlReceiving_->submit(asReceiveMsgAndExecute_);
01132     receiving_ = true;
01133   }
01134   catch (xcept::Exception& e) {
01135     std::string msg = "Failed to start workloop 'Receiving'.";
01136     XCEPT_RETHROW(evf::Exception,msg,e);
01137   }
01138 }
01139 void FUEventProcessor::startReceivingMonitorLoop()
01140 {
01141   try {
01142     wlReceivingMonitor_=
01143       toolbox::task::getWorkLoopFactory()->getWorkLoop("ReceivingM",
01144                                                        "waiting");
01145     if (!wlReceivingMonitor_->isActive()) 
01146       wlReceivingMonitor_->activate();
01147     asReceiveMsgAndRead_ = 
01148       toolbox::task::bind(this,&FUEventProcessor::receivingAndMonitor,
01149                           "ReceivingM");
01150     wlReceivingMonitor_->submit(asReceiveMsgAndRead_);
01151     receivingM_ = true;
01152   }
01153   catch (xcept::Exception& e) {
01154     std::string msg = "Failed to start workloop 'ReceivingM'.";
01155     XCEPT_RETHROW(evf::Exception,msg,e);
01156   }
01157 }
01158 
01159 bool FUEventProcessor::receiving(toolbox::task::WorkLoop *)
01160 {
01161   MsgBuf msg;
01162   try{
01163     myProcess_->rcvSlave(msg,false); //will receive only messages from Master
01164     if(msg->mtype==MSQM_MESSAGE_TYPE_RLI)
01165       {
01166         rlimit rl;
01167         getrlimit(RLIMIT_CORE,&rl);
01168         rl.rlim_cur=0;
01169         setrlimit(RLIMIT_CORE,&rl);
01170         rlimit_coresize_changed_=true;
01171       }
01172     if (msg->mtype==MSQM_MESSAGE_TYPE_RLR)
01173       {
01174         //reset coresize limit
01175         setrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
01176         rlimit_coresize_changed_=false;
01177       }
01178     if(msg->mtype==MSQM_MESSAGE_TYPE_STOP)
01179       {
01180         pthread_mutex_lock(&stop_lock_);
01181         try {
01182           fsm_.fireEvent("Stop",this); // need to set state in fsm first to allow stopDone transition
01183         }
01184         catch (...) {
01185           LOG4CPLUS_ERROR(getApplicationLogger(),"Failed to go to Stopping state in slave EP, pid "
01186                                                  << getpid() << " The state on Stop event was not consistent");
01187         }
01188 
01189         try {
01190           stopClassic(); // call the normal sequence of stopping - as this is allowed to fail provisions must be made ...@@@EM
01191         }
01192         catch (...) {
01193           LOG4CPLUS_ERROR(getApplicationLogger(),"Slave EP 'receiving' workloop: exception " << getpid());
01194         }
01195 
01196         //destroy MessageService thread before exit     
01197         try{
01198           messageServicePresence_.reset();
01199         }
01200         catch(...) {
01201           LOG4CPLUS_ERROR(getApplicationLogger(),"SLAVE:Unable to destroy MessageServicePresence. pid:" << getpid() );
01202         }
01203 
01204         MsgBuf msg1(0,MSQS_MESSAGE_TYPE_STOP);
01205         myProcess_->postSlave(msg1,false);
01206         pthread_mutex_unlock(&stop_lock_);
01207         fclose(stdout);
01208         fclose(stderr);
01209         _exit(EXIT_SUCCESS);
01210       }
01211     if(msg->mtype==MSQM_MESSAGE_TYPE_FSTOP)
01212       _exit(EXIT_SUCCESS);
01213   }
01214   catch(evf::Exception &e){
01215     LOG4CPLUS_ERROR(getApplicationLogger(),"Slave EP pid:" << getpid() << " receiving WorkLoop exception: "<<e.what());
01216   }
01217   return true;
01218 }
01219 
01220 bool FUEventProcessor::supervisor(toolbox::task::WorkLoop *)
01221 {
01222   pthread_mutex_lock(&stop_lock_);
01223   if(subs_.size()!=nbSubProcesses_.value_)
01224     {
01225       pthread_mutex_lock(&pickup_lock_);
01226       if(subs_.size()!=nbSubProcesses_.value_) {
01227         subs_.resize(nbSubProcesses_.value_);
01228         spMStates_.resize(nbSubProcesses_.value_);
01229         spmStates_.resize(nbSubProcesses_.value_);
01230         for(unsigned int i = 0; i < spMStates_.size(); i++)
01231           {
01232             spMStates_[i] = edm::event_processor::sInit; 
01233             spmStates_[i] = 0; 
01234           }
01235       }
01236       pthread_mutex_unlock(&pickup_lock_);
01237     }
01238   bool running = fsm_.stateName()->toString()=="Enabled";
01239   bool stopping = fsm_.stateName()->toString()=="stopping";
01240   for(unsigned int i = 0; i < subs_.size(); i++)
01241     {
01242       if(subs_[i].alive()==-1000) continue;
01243       int sl;
01244       pid_t sub_pid = subs_[i].pid();
01245       pid_t killedOrNot = waitpid(sub_pid,&sl,WNOHANG);
01246 
01247       if(killedOrNot && killedOrNot==sub_pid) {
01248         pthread_mutex_lock(&pickup_lock_);
01249         //check if out of range or recreated (enable can clear vector)
01250         if (i<subs_.size() && subs_[i].alive()!=-1000) {
01251           subs_[i].setStatus((WIFEXITED(sl) != 0 ? 0 : -1));
01252           std::ostringstream ost;
01253           if(subs_[i].alive()==0) ost << " process exited with status " << WEXITSTATUS(sl);
01254           else if(WIFSIGNALED(sl)!=0) {
01255             ost << " process terminated with signal " << WTERMSIG(sl);
01256           }
01257           else ost << " process stopped ";
01258           //report unexpected slave exit in stop
01259           //if (stopping && (WEXITSTATUS(sl)!=0 || WIFSIGNALED(sl)!=0)) {
01260           //  LOG4CPLUS_WARN(getApplicationLogger(),ost.str() << ", slave pid:"<<getpid());
01261           //}
01262           subs_[i].countdown()=slaveRestartDelaySecs_.value_;
01263           subs_[i].setReasonForFailed(ost.str());
01264           spMStates_[i] = evtProcessor_.notstarted_state_code();
01265           spmStates_[i] = 0;
01266           std::ostringstream ost1;
01267           ost1 << "-E- Slave " << subs_[i].pid() << ost.str();
01268           localLog(ost1.str());
01269           if(!autoRestartSlaves_.value_) subs_[i].disconnect();
01270         }
01271         pthread_mutex_unlock(&pickup_lock_);
01272       }
01273     }
01274   pthread_mutex_unlock(&stop_lock_);    
01275   if(stopping) return true; // if in stopping we are done
01276 
01277   // check if we need to reset core dumps (15 min after last one)
01278   if (running && rlimit_coresize_changed_) {
01279     timeval newtv;
01280     gettimeofday(&newtv,0);
01281     int delta = newtv.tv_sec-lastCrashTime_.tv_sec;
01282     if (delta>60*15) {
01283       std::ostringstream ostr;
01284       ostr << " No more slave EP crashes on this machine in last 15 min. resetting core size limits";
01285       std::cout << ostr.str() << std::endl;
01286       LOG4CPLUS_INFO(getApplicationLogger(),ostr.str());
01287       setrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
01288       MsgBuf master_message_rlr_(NUMERIC_MESSAGE_SIZE,MSQM_MESSAGE_TYPE_RLR);
01289       for (unsigned int i = 0; i < subs_.size(); i++) {
01290         try {
01291           if (subs_[i].alive())
01292             subs_[i].post(master_message_rlr_,false);
01293         }
01294         catch (...) {}
01295       }
01296       rlimit_coresize_changed_=false;
01297       crashesThisRun_=0;
01298     }
01299   }
01300 
01301   if(running && edm_init_done_)
01302     {
01303       // if enabled, this loop will periodically check if dead slaves countdown has expired and restart them
01304       // this is only active while running, hence, the stop lock is acquired and only released at end of loop
01305       if(autoRestartSlaves_.value_){
01306         pthread_mutex_lock(&stop_lock_); //lockout slave killing at stop while you check for restarts
01307         for(unsigned int i = 0; i < subs_.size(); i++)
01308           {
01309             if(subs_[i].alive() != 1){
01310               if(subs_[i].countdown() == 0)
01311                 {
01312                   if(subs_[i].restartCount()>2){
01313                     LOG4CPLUS_WARN(getApplicationLogger()," Not restarting subprocess in slot " << i 
01314                                    << " - maximum restart count reached");
01315                     std::ostringstream ost1;
01316                     ost1 << "-W- Dead Process in slot " << i << " reached maximum restart count"; 
01317                     localLog(ost1.str());
01318                     subs_[i].countdown()--;
01319                     XCEPT_DECLARE(evf::Exception,
01320                                   sentinelException, ost1.str());
01321                     notifyQualified("error",sentinelException);
01322                     subs_[i].disconnect();
01323                     continue;
01324                   }
01325                   subs_[i].restartCount()++;
01326                   if (forkInEDM_.value_) {
01327                     restartForkInEDM(i);
01328                   }
01329                   else {
01330                     pid_t rr = subs_[i].forkNew();
01331                     if(rr==0)
01332                     {
01333                       myProcess_=&subs_[i];
01334                       scalersUpdates_ = 0;
01335                       int retval = pthread_mutex_destroy(&stop_lock_);
01336                       if(retval != 0) perror("error");
01337                       retval = pthread_mutex_init(&stop_lock_,0);
01338                       if(retval != 0) perror("error");
01339                       fsm_.disableRcmsStateNotification();
01340                       fsm_.fireEvent("Stop",this); // need to set state in fsm first to allow stopDone transition
01341                       fsm_.fireEvent("StopDone",this); // need to set state in fsm first to allow stopDone transition
01342                       fsm_.fireEvent("Enable",this); // need to set state in fsm first to allow stopDone transition
01343                       try{
01344                         xdata::Serializable *lsid = applicationInfoSpace_->find("lumiSectionIndex");
01345                         if(lsid) {
01346                           ((xdata::UnsignedInteger32*)(lsid))->value_--; // need to reset to value before end of ls in which process died
01347                         }
01348                       }
01349                       catch(...){
01350                         std::cout << "trouble with lsindex during restart" << std::endl;
01351                       }
01352                       try{
01353                         xdata::Serializable *lstb = applicationInfoSpace_->find("lsToBeRecovered");
01354                         if(lstb) {
01355                           ((xdata::Boolean*)(lstb))->value_ = false; // do not issue eol/bol for all Ls when restarting
01356                         }
01357                       }
01358                       catch(...){
01359                         std::cout << "trouble with resetting flag for eol recovery " << std::endl;
01360                       }
01361 
01362                       evtProcessor_.adjustLsIndexForRestart();
01363                       evtProcessor_.resetPackedTriggerReport();
01364                       enableMPEPSlave();
01365                       return false; // exit the supervisor loop immediately in the child !!!
01366                     }
01367                   else
01368                     {
01369                       std::ostringstream ost1;
01370                       ost1 << "-I- New Process " << rr << " forked for slot " << i; 
01371                       localLog(ost1.str());
01372                     }
01373                   }
01374                 }
01375               if(subs_[i].countdown()>=0) subs_[i].countdown()--;
01376             }
01377           }
01378         pthread_mutex_unlock(&stop_lock_);
01379       } // finished handling replacement of dead slaves once they've been reaped
01380     }
01381   xdata::Serializable *lsid = 0; 
01382   xdata::Serializable *psid = 0;
01383   xdata::Serializable *dqmp = 0;
01384   xdata::UnsignedInteger32 *dqm = 0;
01385 
01386 
01387   
01388   if(running && edm_init_done_){  
01389     try{
01390       lsid = applicationInfoSpace_->find("lumiSectionIndex");
01391       psid = applicationInfoSpace_->find("prescaleSetIndex");
01392       nbProcessed = monitorInfoSpace_->find("nbProcessed");
01393       nbAccepted  = monitorInfoSpace_->find("nbAccepted");
01394       dqmp = applicationInfoSpace_-> find("nbDqmUpdates");      
01395     }
01396     catch(xdata::exception::Exception e){
01397       LOG4CPLUS_INFO(getApplicationLogger(),"could not retrieve some data - " << e.what());    
01398     }
01399 
01400     try{
01401       if(nbProcessed !=0 && nbAccepted !=0)
01402         {
01403           xdata::UnsignedInteger32*nbp = ((xdata::UnsignedInteger32*)nbProcessed);
01404           xdata::UnsignedInteger32*nba = ((xdata::UnsignedInteger32*)nbAccepted);
01405           xdata::UnsignedInteger32*ls  = ((xdata::UnsignedInteger32*)lsid);
01406           xdata::UnsignedInteger32*ps  = ((xdata::UnsignedInteger32*)psid);
01407           if(dqmp!=0)
01408             dqm = (xdata::UnsignedInteger32*)dqmp;
01409           if(dqm) dqm->value_ = 0;
01410           nbTotalDQM_ = 0;
01411           nbp->value_ = 0;
01412           nba->value_ = 0;
01413           nblive_ = 0;
01414           nbdead_ = 0;
01415           scalersUpdates_ = 0;
01416 
01417           for(unsigned int i = 0; i < subs_.size(); i++)
01418             {
01419               if(subs_[i].alive()>0)
01420                 {
01421                   nblive_++;
01422                   try{
01423                     subs_[i].post(master_message_prg_,true);
01424                     
01425                     unsigned long retval = subs_[i].rcvNonBlocking(master_message_prr_,true);
01426                     if(retval == (unsigned long) master_message_prr_->mtype){
01427                       prg* p = (struct prg*)(master_message_prr_->mtext);
01428                       subs_[i].setParams(p);
01429                       spMStates_[i] = p->Ms;
01430                       spmStates_[i] = p->ms;
01431                       cpustat_->addEntry(p->ms);
01432                       if(!subs_[i].inInconsistentState() && 
01433                          (p->Ms == edm::event_processor::sError 
01434                           || p->Ms == edm::event_processor::sInvalid
01435                           || p->Ms == edm::event_processor::sStopping))
01436                         {
01437                           std::ostringstream ost;
01438                           ost << "edm::eventprocessor slot " << i << " process id " 
01439                               << subs_[i].pid() << " not in Running state : Mstate=" 
01440                               << evtProcessor_.stateNameFromIndex(p->Ms) << " mstate="
01441                               << evtProcessor_.moduleNameFromIndex(p->ms) 
01442                               << " - Look into possible error messages from HLT process";
01443                           LOG4CPLUS_WARN(getApplicationLogger(),ost.str());
01444                         }
01445                       nbp->value_ += subs_[i].params().nbp;
01446                       nba->value_  += subs_[i].params().nba;
01447                       if(dqm)dqm->value_ += p->dqm;
01448                       nbTotalDQM_ +=  p->dqm;
01449                       scalersUpdates_ += p->trp;
01450                       if(p->ls > ls->value_) ls->value_ = p->ls;
01451                       if(p->ps != ps->value_) ps->value_ = p->ps;
01452                     }
01453                     else{
01454                       nbp->value_ += subs_[i].get_save_nbp();
01455                       nba->value_ += subs_[i].get_save_nba();
01456                     }
01457                   } 
01458                   catch(evf::Exception &e){
01459                     LOG4CPLUS_INFO(getApplicationLogger(),
01460                                    "could not send/receive msg on slot " 
01461                                    << i << " - " << e.what());    
01462                   }
01463                     
01464                 }
01465               else
01466                 {
01467                   nbp->value_ += subs_[i].get_save_nbp();
01468                   nba->value_ += subs_[i].get_save_nba();
01469                   nbdead_++;
01470                 }
01471             }
01472           if(nbp->value_>64){//have some slaves already processed more than one event ? (eventually make this == number of raw cells)
01473             for(unsigned int i = 0; i < subs_.size(); i++)
01474               {
01475                 if(subs_[i].params().nbp == 0){ // a slave has processed 0 events 
01476                   // check that the process is not stuck
01477                   if(subs_[i].alive()>0 && subs_[i].params().ms == 0) // the process is seen alive but in us=Invalid(0)
01478                     {
01479                       subs_[i].found_invalid();//increase the "found_invalid" counter
01480                       if(subs_[i].nfound_invalid() > 60){ //wait x monitor cycles (~1 min a good time ?) before doing something about a stuck slave
01481                         MsgBuf msg3(NUMERIC_MESSAGE_SIZE,MSQM_MESSAGE_TYPE_FSTOP);      // send a force-stop signal             
01482                         subs_[i].post(msg3,false);
01483                         std::ostringstream ost1;
01484                         ost1 << "-W- Process in slot " << i << " Never reached the running state - forcestopping it"; 
01485                         localLog(ost1.str());
01486                         LOG4CPLUS_ERROR(getApplicationLogger(),ost1.str());    
01487                         XCEPT_DECLARE(evf::Exception,
01488                                       sentinelException, ost1.str());
01489                         notifyQualified("error",sentinelException);
01490 
01491                       }
01492                     }
01493                 }
01494               }
01495           }
01496         }
01497     }
01498     catch(std::exception &e){
01499       LOG4CPLUS_INFO(getApplicationLogger(),"std exception - " << e.what());    
01500     }
01501     catch(...){
01502       LOG4CPLUS_INFO(getApplicationLogger(),"unknown exception ");    
01503     }
01504   }
01505   else{
01506     for(unsigned int i = 0; i < subs_.size(); i++)
01507       {
01508         if(subs_[i].alive()==-1000)
01509           {
01510             spMStates_[i] = edm::event_processor::sInit;
01511             spmStates_[i] = 0;
01512           }
01513       }
01514   }
01515   try{
01516     monitorInfoSpace_->lock();
01517     monitorInfoSpace_->fireItemGroupChanged(names_,0);
01518     monitorInfoSpace_->unlock();
01519   }
01520   catch(xdata::exception::Exception &e)
01521     {
01522       LOG4CPLUS_ERROR(log_, "Exception from fireItemGroupChanged: " << e.what());
01523       //        localLog(e.what());
01524     }
01525   ::sleep(superSleepSec_.value_);       
01526   return true;
01527 }
01528 
01529 void FUEventProcessor::startScalersWorkLoop() throw (evf::Exception)
01530 {
01531   try {
01532     wlScalers_=
01533       toolbox::task::getWorkLoopFactory()->getWorkLoop("Scalers",
01534                                                        "waiting");
01535     if (!wlScalers_->isActive()) wlScalers_->activate();
01536     asScalers_ = toolbox::task::bind(this,&FUEventProcessor::scalers,
01537                                      "Scalers");
01538     
01539   wlScalers_->submit(asScalers_);
01540   wlScalersActive_ = true;
01541   }
01542   catch (xcept::Exception& e) {
01543     std::string msg = "Failed to start workloop 'Scalers'.";
01544     XCEPT_RETHROW(evf::Exception,msg,e);
01545   }
01546 }
01547 
01548 //______________________________________________________________________________
01549 
01550 void FUEventProcessor::startSummarizeWorkLoop() throw (evf::Exception)
01551 {
01552   try {
01553     wlSummarize_=
01554       toolbox::task::getWorkLoopFactory()->getWorkLoop("Summary",
01555                                                        "waiting");
01556     if (!wlSummarize_->isActive()) wlSummarize_->activate();
01557     
01558     asSummarize_ = toolbox::task::bind(this,&FUEventProcessor::summarize,
01559                                        "Summary");
01560 
01561     wlSummarize_->submit(asSummarize_);
01562     wlSummarizeActive_ = true;
01563   }
01564   catch (xcept::Exception& e) {
01565     std::string msg = "Failed to start workloop 'Summarize'.";
01566     XCEPT_RETHROW(evf::Exception,msg,e);
01567   }
01568 }
01569 
01570 //______________________________________________________________________________
01571 
01572 bool FUEventProcessor::scalers(toolbox::task::WorkLoop* wl)
01573 {
01574   if(evtProcessor_)
01575     {
01576       if(!evtProcessor_.getTriggerReport(true)) {
01577         wlScalersActive_ = false;
01578         return false;
01579       }
01580     }
01581   else
01582     {
01583       std::cout << getpid()<< " Scalers workloop, bailing out, no evtProcessor " << std::endl;
01584       wlScalersActive_ = false;
01585       return false;
01586     }
01587   if(myProcess_) 
01588     {
01589       //      std::cout << getpid() << "going to post on control queue from scalers" << std::endl;
01590       int ret = myProcess_->postSlave(evtProcessor_.getPackedTriggerReport(),false);
01591       if(ret!=0)      std::cout << "scalers workloop, error posting to sqs_ " << errno << std::endl;
01592       scalersUpdates_++;
01593     }
01594   else
01595     evtProcessor_.fireScalersUpdate();
01596   return true;
01597 }
01598 
01599 //______________________________________________________________________________
01600 bool FUEventProcessor::summarize(toolbox::task::WorkLoop* wl)
01601 {
01602   evtProcessor_.resetPackedTriggerReport();
01603   bool atLeastOneProcessUpdatedSuccessfully = false;
01604   int msgCount = 0;
01605   for (unsigned int i = 0; i < subs_.size(); i++)
01606     {
01607       if(subs_[i].alive()>0)
01608         {
01609           int ret = 0;
01610           if(subs_[i].check_postponed_trigger_update(master_message_trr_,
01611                                                      evtProcessor_.getLumiSectionReferenceIndex()))
01612             {
01613               ret = MSQS_MESSAGE_TYPE_TRR;
01614               std::cout << "using postponed report from slot " << i << " for ls " << evtProcessor_.getLumiSectionReferenceIndex() << std::endl;
01615             }
01616           else{
01617             bool insync = false;
01618             bool exception_caught = false;
01619             while(!insync){
01620               try{
01621                 ret = subs_[i].rcv(master_message_trr_,false);
01622               }
01623               catch(evf::Exception &e)
01624                 {
01625                   std::cout << "exception in msgrcv on " << i 
01626                             << " " << subs_[i].alive() << " " << strerror(errno) << std::endl;
01627                   exception_caught = true;
01628                   break;
01629                   //do nothing special
01630                 }
01631               if(ret==MSQS_MESSAGE_TYPE_TRR) {
01632                 TriggerReportStatic *trp = (TriggerReportStatic *)master_message_trr_->mtext;
01633                 if(trp->lumiSection >= evtProcessor_.getLumiSectionReferenceIndex()){
01634                   insync = true;
01635                 }
01636               }
01637             }
01638             if(exception_caught) continue;
01639           }
01640           msgCount++;
01641           if(ret==MSQS_MESSAGE_TYPE_TRR) {
01642             TriggerReportStatic *trp = (TriggerReportStatic *)master_message_trr_->mtext;
01643             if(trp->lumiSection > evtProcessor_.getLumiSectionReferenceIndex()){
01644               std::cout << "postpone handling of msg from slot " << i << " with Ls " <<  trp->lumiSection
01645                         << " should be " << evtProcessor_.getLumiSectionReferenceIndex() << std::endl;
01646               subs_[i].add_postponed_trigger_update(master_message_trr_);
01647             }else{
01648               atLeastOneProcessUpdatedSuccessfully = true;
01649               evtProcessor_.sumAndPackTriggerReport(master_message_trr_);
01650             }
01651           }
01652           else std::cout << "msgrcv returned error " << errno << std::endl;
01653         }
01654     }
01655   if(atLeastOneProcessUpdatedSuccessfully){
01656     nbSubProcessesReporting_.value_ = msgCount;
01657     evtProcessor_.getPackedTriggerReportAsStruct()->nbExpected = nbSubProcesses_.value_;
01658     evtProcessor_.getPackedTriggerReportAsStruct()->nbReporting = nbSubProcessesReporting_.value_;
01659     evtProcessor_.updateRollingReport();
01660     evtProcessor_.fireScalersUpdate();
01661   }
01662   else{
01663     LOG4CPLUS_WARN(getApplicationLogger(),"Summarize loop: no process updated successfully - sleep 10 seconds before trying again");          
01664     if(msgCount==0) evtProcessor_.withdrawLumiSectionIncrement();
01665     nbSubProcessesReporting_.value_ = 0;
01666     ::sleep(10);
01667   }
01668   if(fsm_.stateName()->toString()!="Enabled"){
01669     wlScalersActive_ = false;
01670     return false;
01671   }
01672   //  cpustat_->printStat();
01673   if(iDieStatisticsGathering_.value_){
01674     try{
01675       unsigned long long idleTmp=idleProcStats_;
01676       unsigned long long allPSTmp=allProcStats_;
01677       idleProcStats_=allProcStats_=0;
01678 
01679       utils::procCpuStat(idleProcStats_,allProcStats_);
01680       timeval oldtime=lastProcReport_;
01681       gettimeofday(&lastProcReport_,0);
01682 
01683       if (allPSTmp!=0 && idleTmp!=0 && allProcStats_!=allPSTmp) {
01684         cpustat_->setCPUStat(1000 - ((idleProcStats_-idleTmp)*1000)/(allProcStats_-allPSTmp));
01685         int deltaTms=1000 * (lastProcReport_.tv_sec-oldtime.tv_sec)
01686                     + (lastProcReport_.tv_usec-oldtime.tv_usec)/1000;
01687         cpustat_->setElapsed(deltaTms);
01688       }
01689       else {
01690         cpustat_->setCPUStat(0);
01691         cpustat_->setElapsed(0);
01692       }
01693 
01694       TriggerReportStatic *trsp = evtProcessor_.getPackedTriggerReportAsStruct();
01695       cpustat_ ->setNproc(trsp->eventSummary.totalEvents);
01696       cpustat_ ->sendStat(evtProcessor_.getLumiSectionReferenceIndex());
01697       ratestat_->sendStat((unsigned char*)trsp,
01698                           sizeof(TriggerReportStatic),
01699                           evtProcessor_.getLumiSectionReferenceIndex());
01700     }catch(evf::Exception &e){
01701       LOG4CPLUS_INFO(getApplicationLogger(),"coud not send statistics"
01702                      << e.what());
01703     }
01704   }
01705   cpustat_->reset();
01706   return true;
01707 }
01708 
01709 
01710 
01711 bool FUEventProcessor::receivingAndMonitor(toolbox::task::WorkLoop *)
01712 {
01713   try{
01714     myProcess_->rcvSlave(slave_message_monitoring_,true); //will receive only messages from Master
01715     switch(slave_message_monitoring_->mtype)
01716       {
01717       case MSQM_MESSAGE_TYPE_MCS:
01718         {
01719           xgi::Input *in = 0;
01720           xgi::Output out;
01721           evtProcessor_.microState(in,&out);
01722           MsgBuf msg1(out.str().size(),MSQS_MESSAGE_TYPE_MCR);
01723           strncpy(msg1->mtext,out.str().c_str(),out.str().size());
01724           myProcess_->postSlave(msg1,true);
01725           break;
01726         }
01727       
01728       case MSQM_MESSAGE_TYPE_PRG:
01729         {
01730           xdata::Serializable *dqmp = 0;
01731           xdata::UnsignedInteger32 *dqm = 0;
01732           evtProcessor_.monitoring(0);
01733           try{
01734             dqmp = applicationInfoSpace_-> find("nbDqmUpdates");
01735           }  catch(xdata::exception::Exception e){}
01736           if(dqmp!=0)
01737             dqm = (xdata::UnsignedInteger32*)dqmp;
01738 
01739           //      monitorInfoSpace_->lock();  
01740           prg * data           = (prg*)slave_message_prr_->mtext;
01741           data->ls             = evtProcessor_.lsid_;
01742           data->eols           = evtProcessor_.lastLumiUsingEol_;
01743           data->ps             = evtProcessor_.psid_;
01744           data->nbp            = evtProcessor_->totalEvents();
01745           data->nba            = evtProcessor_->totalEventsPassed();
01746           data->Ms             = evtProcessor_.epMAltState_.value_;
01747           data->ms             = evtProcessor_.epmAltState_.value_;
01748           if(dqm) data->dqm    = dqm->value_; else data->dqm = 0;
01749           data->trp            = scalersUpdates_;
01750           //      monitorInfoSpace_->unlock();  
01751           myProcess_->postSlave(slave_message_prr_,true);
01752           if(exitOnError_.value_)
01753           { 
01754             // after each monitoring cycle check if we are in inconsistent state and exit if configured to do so  
01755             //      std::cout << getpid() << "receivingAndMonitor: trying to acquire stop lock " << std::endl;
01756             if(data->Ms == edm::event_processor::sStopping || data->Ms == edm::event_processor::sError) 
01757               { 
01758                 bool running = true;
01759                 int count = 0;
01760                 while(running){
01761                   int retval = pthread_mutex_lock(&stop_lock_);
01762                   if(retval != 0) perror("error");
01763                   running = fsm_.stateName()->toString()=="Enabled";
01764                   if(count>5) _exit(-1);
01765                   pthread_mutex_unlock(&stop_lock_);
01766                   if(running) {::sleep(1); count++;}
01767                 }
01768               }
01769             
01770           }
01771           //      scalersUpdates_++;
01772           break;
01773         }
01774       case MSQM_MESSAGE_TYPE_WEB:
01775         {
01776           xgi::Input  *in = 0;
01777           xgi::Output out;
01778           unsigned int bytesToSend = 0;
01779           MsgBuf msg1(NUMERIC_MESSAGE_SIZE,MSQS_MESSAGE_TYPE_WEB);
01780           std::string query = slave_message_monitoring_->mtext;
01781           size_t pos = query.find_first_of("&");
01782           std::string method;
01783           std::string args;
01784           if(pos!=std::string::npos)  
01785             {
01786               method = query.substr(0,pos);
01787               args = query.substr(pos+1,query.length()-pos-1);
01788             }
01789           else
01790             method=query;
01791 
01792           if(method=="Spotlight")
01793             {
01794               spotlightWebPage(in,&out);
01795             }
01796           else if(method=="procStat")
01797             {
01798               procStat(in,&out);
01799             }
01800           else if(method=="moduleWeb")
01801             {
01802               internal::MyCgi mycgi;
01803               boost::char_separator<char> sep(";");
01804               boost::tokenizer<boost::char_separator<char> > tokens(args, sep);
01805               for (boost::tokenizer<boost::char_separator<char> >::iterator tok_iter = tokens.begin();
01806                    tok_iter != tokens.end(); ++tok_iter){
01807                 size_t pos = (*tok_iter).find_first_of("%");
01808                 if(pos != std::string::npos){
01809                   std::string first  = (*tok_iter).substr(0    ,                        pos);
01810                   std::string second = (*tok_iter).substr(pos+1, (*tok_iter).length()-pos-1);
01811                   mycgi.getEnvironment()[first]=second;
01812                 }
01813               }
01814               moduleWeb(&mycgi,&out);
01815             }
01816           else if(method=="Default")
01817             {
01818               defaultWebPage(in,&out);
01819             }
01820           else 
01821             {
01822               out << "Error 404!!!!!!!!" << std::endl;
01823             }
01824 
01825 
01826           bytesToSend = out.str().size();
01827           unsigned int cycle = 0;
01828           if(bytesToSend==0)
01829             {
01830               snprintf(msg1->mtext, NUMERIC_MESSAGE_SIZE, "%d", bytesToSend);
01831               myProcess_->postSlave(msg1,true);
01832             }
01833           while(bytesToSend !=0){
01834             unsigned int msgSize = bytesToSend>MAX_PIPE_BUFFER_SIZE ? MAX_PIPE_BUFFER_SIZE : bytesToSend;
01835             write(anonymousPipe_[PIPE_WRITE],
01836                   out.str().c_str()+MAX_PIPE_BUFFER_SIZE*cycle,
01837                   msgSize);
01838             snprintf(msg1->mtext, NUMERIC_MESSAGE_SIZE, "%d", msgSize);
01839             myProcess_->postSlave(msg1,true);
01840             bytesToSend -= msgSize;
01841             cycle++;
01842           }
01843           break;
01844         }
01845       case MSQM_MESSAGE_TYPE_TRP:
01846         {
01847           break;
01848         }
01849       }
01850   }
01851   catch(evf::Exception &e){std::cout << "exception caught in recevingM: " << e.what() << std::endl;}
01852   return true;
01853 }
01854 
01855 void FUEventProcessor::startSignalMonitorWorkLoop() throw (evf::Exception)
01856 {
01857   //todo rewind/check semaphore
01858   //start workloop
01859   try {
01860     wlSignalMonitor_=
01861       toolbox::task::getWorkLoopFactory()->getWorkLoop("SignalMonitor",
01862                                                        "waiting");
01863 
01864     if (!wlSignalMonitor_->isActive()) wlSignalMonitor_->activate();
01865     asSignalMonitor_ = toolbox::task::bind(this,&FUEventProcessor::sigmon,
01866                                        "SignalMonitor");
01867     wlSignalMonitor_->submit(asSignalMonitor_);
01868     signalMonitorActive_ = true;
01869   }
01870   catch (xcept::Exception& e) {
01871     std::string msg = "Failed to start workloop 'SignalMonitor'. (3)";
01872     std::cout << e.what() << std::endl;
01873     XCEPT_RETHROW(evf::Exception,msg,e);
01874   }
01875 }
01876  
01877 
01878 bool FUEventProcessor::sigmon(toolbox::task::WorkLoop* wl)
01879 {
01880   while (1) {
01881     sem_wait(sigmon_sem_);
01882     std::cout << " received signal notification from slave!"<< std::endl;
01883   
01884     //check if shutdown time
01885     bool running = fsm_.stateName()->toString()=="Enabled";
01886     bool stopping = fsm_.stateName()->toString()=="stopping";
01887     bool enabling = fsm_.stateName()->toString()=="enabling";
01888     if (!running && !enabling) {
01889       signalMonitorActive_ = false;
01890       return false;
01891     }
01892 
01893     crashesThisRun_++;
01894     gettimeofday(&lastCrashTime_,0);
01895 
01896     //set core size limit to 0 in master and slaves
01897     if (crashesThisRun_>=crashesToDump_.value_ && (running || stopping) && !rlimit_coresize_changed_) {
01898 
01899       rlimit rlold;
01900       getrlimit(RLIMIT_CORE,&rlold);
01901       rlimit rlnew = rlold;
01902       rlnew.rlim_cur=0;
01903       setrlimit(RLIMIT_CORE,&rlnew);
01904       rlimit_coresize_changed_=true;
01905       MsgBuf master_message_rli_(NUMERIC_MESSAGE_SIZE,MSQM_MESSAGE_TYPE_RLI);
01906       //in case of frequent crashes, allow first slot to dump (until restart)
01907       unsigned int min=1;
01908       for (unsigned int i = min; i < subs_.size(); i++) {
01909         try {
01910           if (subs_[i].alive()) {
01911             subs_[i].post(master_message_rli_,false);
01912           }
01913         }
01914         catch (...) {}
01915       }
01916       std::ostringstream ostr;
01917       ostr << "Number of recent slave crashes reaches " << crashesThisRun_
01918            << ". Disabling core dumps for next 15 minutes in this FilterUnit";
01919       LOG4CPLUS_WARN(getApplicationLogger(),ostr.str());
01920     }
01921   }//end while loop
01922   signalMonitorActive_ = false;
01923   return false;
01924 }
01925 
01926 
01927 bool FUEventProcessor::enableCommon()
01928 {
01929   try {    
01930     if(hasShMem_) attachDqmToShm();
01931     int sc = 0;
01932     evtProcessor_->clearCounters();
01933     if(isRunNumberSetter_)
01934       evtProcessor_->setRunNumber(runNumber_.value_);
01935     else
01936       evtProcessor_->declareRunNumber(runNumber_.value_);
01937 
01938     try{
01939       ::sleep(1);
01940       evtProcessor_->runAsync();
01941       sc = evtProcessor_->statusAsync();
01942     }
01943     catch(cms::Exception &e) {
01944       reasonForFailedState_ = e.explainSelf();
01945       fsm_.fireFailed(reasonForFailedState_,this);
01946       localLog(reasonForFailedState_);
01947       return false;
01948     }    
01949     catch(std::exception &e) {
01950       reasonForFailedState_  = e.what();
01951       fsm_.fireFailed(reasonForFailedState_,this);
01952       localLog(reasonForFailedState_);
01953       return false;
01954     }
01955     catch(...) {
01956       reasonForFailedState_ = "Unknown Exception";
01957       fsm_.fireFailed(reasonForFailedState_,this);
01958       localLog(reasonForFailedState_);
01959       return false;
01960     }
01961     if(sc != 0) {
01962       std::ostringstream oss;
01963       oss<<"EventProcessor::runAsync returned status code " << sc;
01964       reasonForFailedState_ = oss.str();
01965       fsm_.fireFailed(reasonForFailedState_,this);
01966       localLog(reasonForFailedState_);
01967       return false;
01968     }
01969   }
01970   catch (xcept::Exception &e) {
01971     reasonForFailedState_ = "enabling FAILED: " + (std::string)e.what();
01972     fsm_.fireFailed(reasonForFailedState_,this);
01973     localLog(reasonForFailedState_);
01974     return false;
01975   }
01976   try{
01977     fsm_.fireEvent("EnableDone",this);
01978   }
01979   catch (xcept::Exception &e) {
01980     std::cout << "exception " << (std::string)e.what() << std::endl;
01981     throw;
01982   }
01983 
01984   return false;
01985 }
01986 
01987 void FUEventProcessor::forkProcessFromEDM_helper(void * addr) {
01988   ((FUEventProcessor*)addr)->forkProcessesFromEDM();
01989 }
01990 
01991 void FUEventProcessor::forkProcessesFromEDM() {
01992 
01993   moduleweb::ForkParams * forkParams = &(forkInfoObj_->forkParams);
01994   unsigned int forkFrom=0;
01995   unsigned int forkTo=nbSubProcesses_.value_;
01996   if (forkParams->slotId>=0) {
01997     forkFrom=forkParams->slotId;
01998     forkTo=forkParams->slotId+1;
01999   }
02000 
02001   //before fork, make sure to disconnect output modules from Shm
02002   try {
02003     if (sorRef_) {
02004       std::vector<edm::FUShmOutputModule *> shmOutputs = sorRef_->getShmOutputModules();
02005       for (unsigned int i=0;i<shmOutputs.size();i++) {
02006         //unregister PID from ShmBuffer/RB
02007         shmOutputs[i]->unregisterFromShm();
02008         //disconnect from Shm
02009         shmOutputs[i]->stop();
02010       }
02011     }
02012   }
02013   catch (std::exception &e)
02014   {
02015     reasonForFailedState_ =  (std::string)"Thrown exception while disconnecting ShmOutputModule from Shm: " + e.what();
02016     LOG4CPLUS_ERROR(getApplicationLogger(),reasonForFailedState_);
02017     fsm_.fireFailed(reasonForFailedState_,this);
02018     localLog(reasonForFailedState_);
02019   }
02020   catch (...) {
02021     reasonForFailedState_ =  "Thrown unknown exception while disconnecting ShmOutputModule from Shm: ";
02022     LOG4CPLUS_ERROR(getApplicationLogger(),reasonForFailedState_);
02023     fsm_.fireFailed(reasonForFailedState_,this);
02024     localLog(reasonForFailedState_);
02025   }
02026 
02027   std::string currentState = fsm_.stateName()->toString();
02028 
02029   //destroy MessageServicePresence thread before fork
02030   if (currentState!="stopping") {
02031     try {
02032       messageServicePresence_.reset();
02033     }
02034     catch (...) {
02035       LOG4CPLUS_ERROR(getApplicationLogger(),"Unable to destroy MessageService thread before fork!");
02036     }
02037   }
02038 
02039   if (currentState=="stopping") {
02040     LOG4CPLUS_ERROR(getApplicationLogger(),"Can not fork subprocesses in state " << fsm_.stateName()->toString());
02041     forkParams->isMaster=1;
02042     forkInfoObj_->forkParams.slotId=-1;
02043     forkInfoObj_->forkParams.restart=0;
02044   }
02045   //fork loop
02046   else for(unsigned int i=forkFrom; i<forkTo; i++)
02047   {
02048     int retval = subs_[i].forkNew();
02049     if(retval==0)
02050     {
02051       forkParams->isMaster=0;
02052       myProcess_ = &subs_[i];
02053       // dirty hack: delete/recreate global binary semaphore for later use in child
02054       delete toolbox::mem::_s_mutex_ptr_;
02055       toolbox::mem::_s_mutex_ptr_ = new toolbox::BSem(toolbox::BSem::FULL,true);
02056       int retval = pthread_mutex_destroy(&stop_lock_);
02057       if(retval != 0) perror("error");
02058       retval = pthread_mutex_init(&stop_lock_,0);
02059       if(retval != 0) perror("error");
02060       fsm_.disableRcmsStateNotification();
02061 
02062       //recreate MessageLogger thread in slave after fork
02063       try{
02064         edm::PresenceFactory *pf = edm::PresenceFactory::get();
02065         if(pf != 0) {
02066           messageServicePresence_ = pf->makePresence("MessageServicePresence");
02067         }
02068         else {
02069           LOG4CPLUS_ERROR(getApplicationLogger(),
02070               "SLAVE: Unable to create message service presence. pid:"<<getpid());
02071         }
02072       }
02073       catch(...) {
02074         LOG4CPLUS_ERROR(getApplicationLogger(),"SLAVE: Unknown Exception in MessageServicePresence. pid:"<<getpid());
02075       }
02076 
02077       ML::MLlog4cplus::setAppl(this);
02078 
02079       //reconnect to Shm from output modules
02080       try {
02081         if (sorRef_) {
02082           std::vector<edm::FUShmOutputModule *> shmOutputs = sorRef_->getShmOutputModules();
02083           for (unsigned int i=0;i<shmOutputs.size();i++)
02084             shmOutputs[i]->start();
02085         }
02086       }
02087       catch (...)
02088       {
02089         LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception (ShmOutputModule sending InitMsg (pid:"<<getpid() <<")");
02090       }
02091 
02092       if (forkParams->restart) {
02093         //do restart things
02094         scalersUpdates_ = 0;
02095         try {
02096           fsm_.fireEvent("Stop",this); // need to set state in fsm first to allow stopDone transition
02097           fsm_.fireEvent("StopDone",this); // need to set state in fsm first to allow stopDone transition
02098           fsm_.fireEvent("Enable",this); // need to set state in fsm first to allow stopDone transition
02099         } catch (...) {
02100           LOG4CPLUS_WARN(getApplicationLogger(),"Failed to Stop/Enable FSM of the restarted slave EP");
02101         }
02102         try{
02103           xdata::Serializable *lsid = applicationInfoSpace_->find("lumiSectionIndex");
02104           if(lsid) {
02105             ((xdata::UnsignedInteger32*)(lsid))->value_--; // need to reset to value before end of ls in which process died
02106           }
02107         }
02108         catch(...){
02109           std::cout << "trouble with lsindex during restart" << std::endl;
02110         }
02111         try{
02112           xdata::Serializable *lstb = applicationInfoSpace_->find("lsToBeRecovered");
02113           if(lstb) {
02114             ((xdata::Boolean*)(lstb))->value_ = false; // do not issue eol/bol for all Ls when restarting
02115           }
02116         }
02117         catch(...){
02118           std::cout << "trouble with resetting flag for eol recovery " << std::endl;
02119         }
02120         evtProcessor_.adjustLsIndexForRestart();
02121         evtProcessor_.resetPackedTriggerReport();
02122       }
02123 
02124       //start other threads
02125       startReceivingLoop();
02126       startReceivingMonitorLoop();
02127       startScalersWorkLoop();
02128       while(!evtProcessor_.isWaitingForLs())
02129         ::usleep(100000);//wait for scalers loop to start
02130 
02131       //connect DQMShmOutputModule
02132       if(hasShMem_) attachDqmToShm();
02133 
02134       //catch transition error if we are already Enabled
02135       try {
02136         fsm_.fireEvent("EnableDone",this);
02137       }
02138       catch (...) {}
02139 
02140       //make sure workloops are started
02141       while (!wlReceiving_->isActive() || !wlReceivingMonitor_->isActive()) usleep(10000);
02142 
02143       //unmask signals
02144       sigset_t tmpset_thread;
02145       sigemptyset(&tmpset_thread);
02146       sigaddset(&tmpset_thread, SIGQUIT);
02147       sigaddset(&tmpset_thread, SIGILL);
02148       sigaddset(&tmpset_thread, SIGABRT);
02149       sigaddset(&tmpset_thread, SIGFPE);
02150       sigaddset(&tmpset_thread, SIGSEGV);
02151       sigaddset(&tmpset_thread, SIGALRM);
02152       //sigprocmask(SIG_UNBLOCK, &tmpset_thread, 0);
02153       pthread_sigmask(SIG_UNBLOCK,&tmpset_thread,0);
02154      
02155       //set signal handlers 
02156       struct sigaction sa;
02157       sigset_t tmpset;
02158       memset(&tmpset,0,sizeof(tmpset));
02159       sigemptyset(&tmpset);
02160       sa.sa_mask=tmpset;
02161       sa.sa_flags=SA_RESETHAND | SA_SIGINFO;
02162       sa.sa_handler=0;
02163       sa.sa_sigaction=evfep_sighandler;
02164 
02165       sigaction(SIGQUIT,&sa,0);
02166       sigaction(SIGILL,&sa,0);
02167       sigaction(SIGABRT,&sa,0);
02168       sigaction(SIGFPE,&sa,0);
02169       sigaction(SIGSEGV,&sa,0);
02170       sa.sa_sigaction=evfep_alarmhandler;
02171       sigaction(SIGALRM,&sa,0);
02172 
02173       //child return to DaqSource
02174       return ;
02175     }
02176     else {
02177 
02178       forkParams->isMaster=1;
02179       forkInfoObj_->forkParams.slotId=-1;
02180       if (forkParams->restart) {
02181         std::ostringstream ost1;
02182         ost1 << "-I- New Process " << retval << " forked for slot " << forkParams->slotId;
02183         localLog(ost1.str());
02184       }
02185       forkInfoObj_->forkParams.restart=0;
02186       //start "crash" receiver workloop
02187     }
02188   }
02189 
02190   //recreate MessageLogger thread after fork
02191   try{
02192     //release the presense factory in master
02193     edm::PresenceFactory *pf = edm::PresenceFactory::get();
02194     if(pf != 0) {
02195       messageServicePresence_ = pf->makePresence("MessageServicePresence");
02196     }
02197     else {
02198       LOG4CPLUS_ERROR(getApplicationLogger(),
02199           "Unable to recreate message service presence ");
02200     }
02201   }
02202   catch(...) {
02203     LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception in MessageServicePresence");
02204   }
02205 
02206   edm_init_done_=true;
02207 }
02208 
02209 bool FUEventProcessor::enableForkInEDM() 
02210 {
02211   evtProcessor_.resetWaiting();
02212   try {
02213     //set to connect to Shm later
02214     //if(hasShMem_) setAttachDqmToShm();
02215 
02216     int sc = 0;
02217     //maybe not needed in MP mode
02218     evtProcessor_->clearCounters();
02219     if(isRunNumberSetter_)
02220       evtProcessor_->setRunNumber(runNumber_.value_);
02221     else
02222       evtProcessor_->declareRunNumber(runNumber_.value_);
02223 
02224     //prepare object used to communicate with DaqSource
02225     pthread_mutex_destroy(&forkObjLock_);
02226     pthread_mutex_init(&forkObjLock_,0);
02227     if (forkInfoObj_) delete forkInfoObj_;
02228     forkInfoObj_ = new moduleweb::ForkInfoObj();
02229     forkInfoObj_->mst_lock_=&forkObjLock_;
02230     forkInfoObj_->fuAddr=(void*)this;
02231     forkInfoObj_->forkHandler = forkProcessFromEDM_helper;
02232     forkInfoObj_->forkParams.slotId=-1;
02233     forkInfoObj_->forkParams.restart=0;
02234     forkInfoObj_->forkParams.isMaster=-1;
02235     forkInfoObj_->stopCondition=0;
02236     if (mwrRef_)
02237       mwrRef_->publishForkInfo(std::string("DaqSource"),forkInfoObj_);
02238 
02239     evtProcessor_->runAsync();
02240     sc = evtProcessor_->statusAsync();
02241 
02242     if(sc != 0) {
02243       std::ostringstream oss;
02244       oss<<"EventProcessor::runAsync returned status code " << sc;
02245       reasonForFailedState_ = oss.str();
02246       fsm_.fireFailed(reasonForFailedState_,this);
02247       LOG4CPLUS_FATAL(log_,reasonForFailedState_);
02248       return false;
02249     }
02250   }
02251   //catch exceptions on master side
02252   catch(cms::Exception &e) {
02253     reasonForFailedState_ = e.explainSelf();
02254     fsm_.fireFailed(reasonForFailedState_,this);
02255     LOG4CPLUS_FATAL(log_,reasonForFailedState_);
02256     return false;
02257   }    
02258   catch(std::exception &e) {
02259     reasonForFailedState_  = e.what();
02260     fsm_.fireFailed(reasonForFailedState_,this);
02261     LOG4CPLUS_FATAL(log_,reasonForFailedState_);
02262     return false;
02263   }
02264   catch(...) {
02265     reasonForFailedState_ = "Unknown Exception";
02266     fsm_.fireFailed(reasonForFailedState_,this);
02267     LOG4CPLUS_FATAL(log_,reasonForFailedState_);
02268     return false;
02269   }
02270   return true;
02271 }
02272 
02273 bool FUEventProcessor::restartForkInEDM(unsigned int slotId) {
02274   //daqsource will keep this lock until master returns after fork
02275   //so that we don't do another EP restart in between
02276   forkInfoObj_->lock();
02277   forkInfoObj_->forkParams.slotId=slotId;
02278   forkInfoObj_->forkParams.restart=true;
02279   forkInfoObj_->forkParams.isMaster=1;
02280   forkInfoObj_->stopCondition=0;
02281   LOG4CPLUS_DEBUG(log_, " restarting subprocess in slot "<< slotId <<": posting on control semaphore");
02282   sem_post(forkInfoObj_->control_sem_);
02283   forkInfoObj_->unlock();
02284   usleep(1000);
02285   return true;
02286 }
02287 
02288 bool FUEventProcessor::enableClassic()
02289 {
02290   bool retval = enableCommon();
02291   while(evtProcessor_->getState()!= edm::event_processor::sRunning){
02292     LOG4CPLUS_INFO(getApplicationLogger(),"waiting for edm::EventProcessor to start before enabling watchdog");
02293     ::sleep(1);
02294   }
02295   
02296   //  implementation moved to EPWrapper
02297   //  startScalersWorkLoop(); // this is now not done any longer 
02298   localLog("-I- Start completed");
02299   return retval;
02300 }
02301 bool FUEventProcessor::enableMPEPSlave()
02302 {
02303   //all this happens only in the child process
02304 
02305   startReceivingLoop();
02306   startReceivingMonitorLoop();
02307   evtProcessor_.resetWaiting();
02308   startScalersWorkLoop();
02309   while(!evtProcessor_.isWaitingForLs())
02310     ::sleep(1);
02311 
02312   // @EM test do not run monitor loop in slave, only receiving&Monitor
02313   //  evtProcessor_.startMonitoringWorkLoop();
02314   try{
02315     //    evtProcessor_.makeServicesOnly();
02316     try{
02317       edm::PresenceFactory *pf = edm::PresenceFactory::get();
02318       if(pf != 0) {
02319         pf->makePresence("MessageServicePresence").release();
02320       }
02321       else {
02322         LOG4CPLUS_ERROR(getApplicationLogger(),
02323                         "Unable to create message service presence ");
02324       }
02325     } 
02326     catch(...) {
02327       LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception");
02328     }
02329   ML::MLlog4cplus::setAppl(this);
02330   }       
02331   catch (xcept::Exception &e) {
02332     reasonForFailedState_ = "enabling FAILED: " + (std::string)e.what();
02333     fsm_.fireFailed(reasonForFailedState_,this);
02334     localLog(reasonForFailedState_);
02335   }
02336   bool retval =  enableCommon();
02337   //  while(evtProcessor_->getState()!= edm::event_processor::sRunning){
02338   //    LOG4CPLUS_INFO(getApplicationLogger(),"waiting for edm::EventProcessor to start before enabling watchdog");
02339   //    ::sleep(1);
02340   //  }
02341   return retval;
02342 }
02343 
02344 bool FUEventProcessor::stopClassic()
02345 {
02346   bool failed=false;
02347   try {
02348     LOG4CPLUS_INFO(getApplicationLogger(),"Start stopping :) ...");
02349     edm::EventProcessor::StatusCode rc = evtProcessor_.stop();
02350     if(rc == edm::EventProcessor::epSuccess) 
02351       fsm_.fireEvent("StopDone",this);
02352     else
02353       {
02354         failed=true;
02355         //      epMState_ = evtProcessor_->currentStateName();
02356         if(rc == edm::EventProcessor::epTimedOut)
02357           reasonForFailedState_ = "EventProcessor stop timed out";
02358         else
02359           reasonForFailedState_ = "EventProcessor did not receive STOP event";
02360       }
02361   }
02362   catch (xcept::Exception &e) {
02363     failed=true;
02364     reasonForFailedState_ = "Stopping FAILED: " + (std::string)e.what();
02365   }
02366   catch (edm::Exception &e) {
02367     failed=true;
02368     reasonForFailedState_ = "Stopping FAILED: " + (std::string)e.what();
02369   }
02370   catch (...) {
02371     failed=true;
02372     reasonForFailedState_= "Stopping FAILED: unknown exception";
02373   }
02374   try {
02375     if (hasShMem_) {
02376       detachDqmFromShm();
02377       if (failed) 
02378         LOG4CPLUS_WARN(getApplicationLogger(), 
02379             "In failed STOP - success detaching DQM from Shm. pid:" << getpid());
02380     }
02381   }
02382   catch (cms::Exception & e) {
02383     failed=true;
02384     reasonForFailedState_= "Stopping FAILED: " + (std::string)e.what();
02385   }
02386   catch (...) {
02387     failed=true;
02388     reasonForFailedState_= "DQM detach failed: Unknown exception";
02389   }
02390 
02391   if (failed) {
02392     LOG4CPLUS_FATAL(getApplicationLogger(),"STOP failed: "
02393         << reasonForFailedState_ << " (pid:" << getpid()<<")");
02394     localLog(reasonForFailedState_);
02395     fsm_.fireFailed(reasonForFailedState_,this);
02396   }
02397 
02398   LOG4CPLUS_INFO(getApplicationLogger(),"Finished stopping!");
02399   localLog("-I- Stop completed");
02400   return false;
02401 }
02402 
02403 void FUEventProcessor::stopSlavesAndAcknowledge()
02404 {
02405   MsgBuf msg(0,MSQM_MESSAGE_TYPE_STOP);
02406   MsgBuf msg1(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_STOP);
02407 
02408   std::vector<bool> processes_to_stop(nbSubProcesses_.value_,false);
02409   for(unsigned int i = 0; i < subs_.size(); i++)
02410     {
02411       pthread_mutex_lock(&stop_lock_);
02412       if(subs_[i].alive()>0){
02413         processes_to_stop[i] = true;
02414         subs_[i].post(msg,false);
02415       }
02416       pthread_mutex_unlock(&stop_lock_);
02417     }
02418   for(unsigned int i = 0; i < subs_.size(); i++)
02419     {
02420       pthread_mutex_lock(&stop_lock_);
02421       if(processes_to_stop[i]){
02422         try{
02423           subs_[i].rcv(msg1,false);
02424         }
02425         catch(evf::Exception &e){
02426           std::ostringstream ost;
02427           ost << "failed to get STOP - errno ->" << errno << " " << e.what(); 
02428           reasonForFailedState_ = ost.str();
02429           LOG4CPLUS_ERROR(getApplicationLogger(),reasonForFailedState_);
02430           //      fsm_.fireFailed(reasonForFailedState_,this);
02431           localLog(reasonForFailedState_);
02432           pthread_mutex_unlock(&stop_lock_);
02433           continue;
02434         }
02435       }
02436       else {
02437         pthread_mutex_unlock(&stop_lock_);
02438         continue;
02439       }
02440       pthread_mutex_unlock(&stop_lock_);
02441       if(msg1->mtype==MSQS_MESSAGE_TYPE_STOP)
02442         while(subs_[i].alive()>0) ::usleep(10000);
02443       subs_[i].disconnect();
02444     }
02445   //  subs_.clear();
02446 
02447 }
02448 
02449 void FUEventProcessor::microState(xgi::Input *in,xgi::Output *out)
02450 {
02451   std::string urn = getApplicationDescriptor()->getURN();
02452   try{
02453     evtProcessor_.stateNameFromIndex(0);
02454     evtProcessor_.moduleNameFromIndex(0);
02455   if(myProcess_) {std::cout << "microstate called for child! bail out" << std::endl; return;}
02456   *out << "<tr><td>" << fsm_.stateName()->toString() 
02457        << "</td><td>"<< (myProcess_ ? "S" : "M") <<"</td><td>" << nblive_ << "</td><td>"
02458        << nbdead_ << "</td><td><a href=\"/" << urn << "/procStat\">" << getpid() <<"</a></td>";
02459   evtProcessor_.microState(in,out);
02460   *out << "<td></td><td>" << nbTotalDQM_ 
02461        << "</td><td>" << evtProcessor_.getScalersUpdates() << "</td></tr>";
02462   if(nbSubProcesses_.value_!=0 && !myProcess_) 
02463     {
02464       pthread_mutex_lock(&start_lock_);
02465       for(unsigned int i = 0; i < subs_.size(); i++)
02466         {
02467           try{
02468             if(subs_[i].alive()>0)
02469               {
02470                 *out << "<tr><td  bgcolor=\"#00FF00\" id=\"a"
02471                      << i << "\">""Alive</td><td>S</td><td>"
02472                      << subs_[i].queueId() << "<td>" 
02473                      << subs_[i].queueStatus()<< "/"
02474                      << subs_[i].queueOccupancy() << "/"
02475                      << subs_[i].queuePidOfLastSend() << "/"
02476                      << subs_[i].queuePidOfLastReceive() 
02477                      << "</td><td><a id=\"p"<< i << "\" href=\"SubWeb?process=" 
02478                      << subs_[i].pid() << "&method=procStat\">" 
02479                      << subs_[i].pid()<<"</a></td>" //<< msg->mtext;
02480                      << "<td>" << evtProcessor_.stateNameFromIndex(subs_[i].params().Ms) << "</td><td>" 
02481                      << evtProcessor_.moduleNameFromIndex(subs_[i].params().ms) << "</td><td>" 
02482                      << subs_[i].params().nba << "/" << subs_[i].params().nbp 
02483                      << " (" << float(subs_[i].params().nba)/float(subs_[i].params().nbp)*100. <<"%)" 
02484                      << "</td><td>" << subs_[i].params().ls  << "/" << subs_[i].params().ls 
02485                      << "</td><td>" << subs_[i].params().ps 
02486                      << "</td><td" 
02487                      << ((subs_[i].params().eols<subs_[i].params().ls) ? " bgcolor=\"#00FF00\"" : " bgcolor=\"#FF0000\"")  << ">" 
02488                      << subs_[i].params().eols  
02489                      << "</td><td>" << subs_[i].params().dqm 
02490                      << "</td><td>" << subs_[i].params().trp << "</td>";
02491               }
02492             else 
02493               {
02494                 pthread_mutex_lock(&pickup_lock_);
02495                 *out << "<tr><td id=\"a"<< i << "\" ";
02496                 if(subs_[i].alive()==-1000)
02497                   *out << " bgcolor=\"#bbaabb\">NotInitialized";
02498                 else
02499                   *out << (subs_[i].alive()==0 ? ">Done" : " bgcolor=\"#FF0000\">Dead");
02500                 *out << "</td><td>S</td><td>"<< subs_[i].queueId() << "<td>" 
02501                      << subs_[i].queueStatus() << "/"
02502                      << subs_[i].queueOccupancy() << "/"
02503                      << subs_[i].queuePidOfLastSend() << "/"
02504                      << subs_[i].queuePidOfLastReceive() 
02505                      << "</td><td id=\"p"<< i << "\">"
02506                      <<subs_[i].pid()<<"</td><td colspan=\"5\">" << subs_[i].reasonForFailed();
02507                 if(subs_[i].alive()!=0 && subs_[i].alive()!=-1000) 
02508                   {
02509                     if(autoRestartSlaves_ && subs_[i].restartCount()<=2) 
02510                       *out << " will restart in " << subs_[i].countdown() << " s";
02511                     else if(autoRestartSlaves_)
02512                       *out << " reached maximum restart count";
02513                     else *out << " autoRestart is disabled ";
02514                   }
02515                 *out << "</td><td" 
02516                      << ((subs_[i].params().eols<subs_[i].params().ls) ? 
02517                          " bgcolor=\"#00FF00\"" : " bgcolor=\"#FF0000\"")  
02518                      << ">" 
02519                      << subs_[i].params().eols  
02520                      << "</td><td>" << subs_[i].params().dqm 
02521                      << "</td><td>" << subs_[i].params().trp << "</td>";
02522                 pthread_mutex_unlock(&pickup_lock_);
02523               }
02524             *out << "</tr>";
02525           }
02526           catch(evf::Exception &e){
02527             *out << "<tr><td id=\"a"<< i << "\" " 
02528                  <<"bgcolor=\"#FFFF00\">NotResp</td><td>S</td><td>"<< subs_[i].queueId() << "<td>" 
02529                  << subs_[i].queueStatus() << "/"
02530                  << subs_[i].queueOccupancy() << "/"
02531                  << subs_[i].queuePidOfLastSend() << "/"
02532                  << subs_[i].queuePidOfLastReceive() 
02533                  << "</td><td id=\"p"<< i << "\">"
02534                  <<subs_[i].pid()<<"</td>";
02535           }
02536         }
02537       pthread_mutex_unlock(&start_lock_); 
02538     }
02539   }
02540       catch(evf::Exception &e)
02541       {
02542         LOG4CPLUS_INFO(getApplicationLogger(),"evf::Exception caught in microstate - " << e.what());    
02543       }
02544     catch(cms::Exception &e)
02545       {
02546         LOG4CPLUS_INFO(getApplicationLogger(),"cms::Exception caught in microstate - " << e.what());    
02547       }
02548     catch(std::exception &e)
02549       {
02550         LOG4CPLUS_INFO(getApplicationLogger(),"std::Exception caught in microstate - " << e.what());    
02551       }
02552     catch(...)
02553       {
02554         LOG4CPLUS_INFO(getApplicationLogger(),"unknown exception caught in microstate - ");    
02555       }
02556 
02557 }
02558 
02559 
02560 void FUEventProcessor::updater(xgi::Input *in,xgi::Output *out)
02561 {
02562   using namespace utils;
02563 
02564   *out << updaterStatic_;
02565   mDiv(out,"loads");
02566   uptime(out);
02567   cDiv(out);
02568   mDiv(out,"st",fsm_.stateName()->toString());
02569   mDiv(out,"ru",runNumber_.toString());
02570   mDiv(out,"nsl",nbSubProcesses_.value_);
02571   mDiv(out,"nsr",nbSubProcessesReporting_.value_);
02572   mDiv(out,"cl");
02573   *out << getApplicationDescriptor()->getClassName() 
02574        << (nbSubProcesses_.value_ > 0 ? "MP " : " ");
02575   cDiv(out);
02576   mDiv(out,"in",getApplicationDescriptor()->getInstance());
02577   if(fsm_.stateName()->toString() != "Halted" && fsm_.stateName()->toString() != "halting"){
02578     mDiv(out,"hlt");
02579     *out << "<a href=\"" << configString_.toString() << "\">HLT Config</a>";
02580     cDiv(out);
02581     *out << std::endl;
02582   }
02583   else
02584     mDiv(out,"hlt","Not yet...");
02585 
02586   mDiv(out,"sq",squidPresent_.toString());
02587   mDiv(out,"vwl",(supervising_ ? "Active" : "Not Initialized"));
02588   mDiv(out,"mwl",evtProcessor_.wlMonitoring());
02589   if(nbProcessed != 0 && nbAccepted != 0)
02590     {
02591       mDiv(out,"tt",((xdata::UnsignedInteger32*)nbProcessed)->value_);
02592       mDiv(out,"ac",((xdata::UnsignedInteger32*)nbAccepted)->value_);
02593     }
02594   else
02595     {
02596       mDiv(out,"tt",0);
02597       mDiv(out,"ac",0);
02598     }
02599   if(!myProcess_)
02600     mDiv(out,"swl",(wlSummarizeActive_ ? "Active" : "Inactive"));
02601   else
02602     mDiv(out,"swl",(wlScalersActive_ ? "Active" : "Inactive"));
02603 
02604   mDiv(out,"idi",iDieUrl_.value_);
02605   if(vp_!=0){
02606     mDiv(out,"vpi",(unsigned int) vp_);
02607     if(vulture_->hasStarted()>=0)
02608       mDiv(out,"vul","Prowling");
02609     else
02610       mDiv(out,"vul","Dead");
02611   }
02612   else{
02613     mDiv(out,"vul",(vulture_==0 ? "Nope" : "Hatching"));
02614   }    
02615   if(evtProcessor_){
02616     mDiv(out,"ll");
02617     *out << evtProcessor_.lastLumi().ls
02618          << "," << evtProcessor_.lastLumi().proc << "," << evtProcessor_.lastLumi().acc;
02619     cDiv(out);
02620   }
02621   mDiv(out,"lg");
02622   for(unsigned int i = logRingIndex_; i<logRingSize_; i++)
02623     *out << logRing_[i] << std::endl;
02624   if(logWrap_)
02625     for(unsigned int i = 0; i<logRingIndex_; i++)
02626       *out << logRing_[i] << std::endl;
02627   cDiv(out);
02628   mDiv(out,"cha");
02629   if(cpustat_) *out << cpustat_->getChart();
02630   cDiv(out);
02631 }
02632 
02633 void FUEventProcessor::procStat(xgi::Input *in, xgi::Output *out)
02634 {
02635   evf::utils::procStat(out);
02636 }
02637 
02638 void FUEventProcessor::sendMessageOverMonitorQueue(MsgBuf &buf)
02639 {
02640   if(myProcess_) myProcess_->postSlave(buf,true);
02641 }
02642 
02643 void FUEventProcessor::makeStaticInfo()
02644 {
02645   using namespace utils;
02646   std::ostringstream ost;
02647   mDiv(&ost,"ve");
02648   ost<< "$Revision: 1.155 $ (" << edm::getReleaseVersion() <<")";
02649   cDiv(&ost);
02650   mDiv(&ost,"ou",outPut_.toString());
02651   mDiv(&ost,"sh",hasShMem_.toString());
02652   mDiv(&ost,"mw",hasModuleWebRegistry_.toString());
02653   mDiv(&ost,"sw",hasServiceWebRegistry_.toString());
02654   
02655   xdata::Serializable *monsleep = 0;
02656   xdata::Serializable *lstimeout = 0;
02657   try{
02658     monsleep  = applicationInfoSpace_->find("monSleepSec");
02659     lstimeout = applicationInfoSpace_->find("lsTimeOut");
02660   }
02661   catch(xdata::exception::Exception e){
02662   }
02663   
02664   if(monsleep!=0)
02665     mDiv(&ost,"ms",monsleep->toString());
02666   if(lstimeout!=0)
02667     mDiv(&ost,"lst",lstimeout->toString());
02668   char cbuf[sizeof(struct utsname)];
02669   struct utsname* buf = (struct utsname*)cbuf;
02670   uname(buf);
02671   mDiv(&ost,"sysinfo");
02672   ost << buf->sysname << " " << buf->nodename 
02673       << " " << buf->release << " " << buf->version << " " << buf->machine;
02674   cDiv(&ost);
02675   updaterStatic_ = ost.str();
02676 }
02677 
02678 void FUEventProcessor::handleSignalSlave(int sig, siginfo_t* info, void* c)
02679 {
02680   //notify master
02681   sem_post(sigmon_sem_);
02682 
02683   //sleep while master takes action
02684   sleep(2);
02685 
02686   //set up alarm if handler deadlocks on unsafe actions
02687   alarm(5);
02688   
02689   std::cout << "--- Slave EP signal handler caught signal " << sig << " process id is " << info->si_pid <<" ---" << std::endl;
02690   std::cout << "--- Address: " << std::hex << info->si_addr << std::dec << " --- " << std::endl;
02691   std::cout << "--- Stacktrace follows --" << std::endl;
02692   std::ostringstream stacktr;
02693   toolbox::stacktrace(20,stacktr);
02694   std::cout << stacktr.str();
02695   if (!rlimit_coresize_changed_)
02696     std::cout << "--- Dumping core." <<  " --- " << std::endl;
02697   else
02698     std::cout << "--- Core dump count exceeded on this FU. ---"<<std::endl;
02699  
02700   std::string hasdump = "";
02701   if (rlimit_coresize_changed_) hasdump = " (core dump disabled) ";
02702 
02703   LOG4CPLUS_ERROR(getApplicationLogger(),    "--- Slave EP signal handler caught signal " << sig << ". process id is " << getpid() 
02704                                           << " on node " << toolbox::net::getHostName() << " ---" << std::endl
02705                                           << "--- Address: " << std::hex << info->si_addr << std::dec << " --- " << std::endl
02706                                           << "--- Stacktrace follows"<< hasdump << " ---" << std::endl << stacktr.str()
02707                                           );
02708 
02709   //re-raise signal with default handler (will cause core dump if enabled)
02710   raise(sig);
02711 }
02712 
02713 
02714 XDAQ_INSTANTIATOR_IMPL(evf::FUEventProcessor)