CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_5_3_6/src/EventFilter/Processor/src/FUEventProcessor.cc

Go to the documentation of this file.
00001 
00002 //
00003 // FUEventProcessor
00004 // ----------------
00005 //
00007 
00008 #include "FUEventProcessor.h"
00009 #include "procUtils.h"
00010 #include "EventFilter/Utilities/interface/CPUStat.h"
00011 #include "EventFilter/Utilities/interface/RateStat.h"
00012 
00013 #include "EventFilter/Utilities/interface/Exception.h"
00014 
00015 #include "EventFilter/Message2log4cplus/interface/MLlog4cplus.h"
00016 #include "EventFilter/Modules/interface/FUShmDQMOutputService.h"
00017 #include "EventFilter/Utilities/interface/ServiceWebRegistry.h"
00018 #include "EventFilter/Utilities/interface/ServiceWeb.h"
00019 #include "EventFilter/Utilities/interface/ModuleWeb.h"
00020 #include "EventFilter/Utilities/interface/ModuleWebRegistry.h"
00021 #include "EventFilter/Modules/src/FUShmOutputModule.h"
00022 
00023 #include "FWCore/PluginManager/interface/ProblemTracker.h"
00024 #include "FWCore/PluginManager/interface/PresenceFactory.h"
00025 #include "FWCore/Utilities/interface/Presence.h"
00026 #include "FWCore/Utilities/interface/Exception.h"
00027 #include "FWCore/Version/interface/GetReleaseVersion.h"
00028 #include "FWCore/ServiceRegistry/interface/Service.h"
00029 
00030 #include "toolbox/BSem.h" 
00031 #include "toolbox/Runtime.h"
00032 #include "toolbox/stacktrace.h"
00033 #include "toolbox/net/Utils.h"
00034 
00035 #include <boost/tokenizer.hpp>
00036 
00037 #include "xcept/tools.h"
00038 #include "xgi/Method.h"
00039 
00040 #include "cgicc/CgiDefs.h"
00041 #include "cgicc/Cgicc.h"
00042 #include "cgicc/FormEntry.h"
00043 
00044 
00045 #include <sys/wait.h>
00046 #include <sys/utsname.h>
00047 #include <sys/mman.h>
00048 #include <signal.h>
00049 
00050 #include <typeinfo>
00051 #include <stdlib.h>
00052 #include <stdio.h>
00053 #include <errno.h>
00054 
00055 
00056 using namespace evf;
00057 using namespace cgicc;
00058 namespace toolbox {
00059   namespace mem {
00060     extern toolbox::BSem * _s_mutex_ptr_; 
00061   }
00062 }
00063 
00064 //signal handler (global)
00065 namespace evf {
00066   FUEventProcessor * FUInstancePtr_;
00067   int evfep_raised_signal;
00068   void evfep_sighandler(int sig, siginfo_t* info, void* c)
00069   {
00070     evfep_raised_signal=sig;
00071     FUInstancePtr_->handleSignalSlave(sig, info, c);
00072   }
00073   void evfep_alarmhandler(int sig, siginfo_t* info, void* c)
00074   {
00075     if (evfep_raised_signal) {
00076       signal(evfep_raised_signal,SIG_DFL);
00077       raise(evfep_raised_signal);
00078     }
00079   }
00080 }
00081 
00083 // construction/destruction
00085 
00086 //______________________________________________________________________________
00087 FUEventProcessor::FUEventProcessor(xdaq::ApplicationStub *s) 
00088   : xdaq::Application(s)
00089   , fsm_(this)
00090   , log_(getApplicationLogger())
00091   , evtProcessor_(log_, getApplicationDescriptor()->getInstance())
00092   , runNumber_(0)
00093   , epInitialized_(false)
00094   , outPut_(true)
00095   , autoRestartSlaves_(false)
00096   , slaveRestartDelaySecs_(10)
00097   , hasShMem_(true)
00098   , hasPrescaleService_(true)
00099   , hasModuleWebRegistry_(true)
00100   , hasServiceWebRegistry_(true)
00101   , isRunNumberSetter_(true)
00102   , iDieStatisticsGathering_(false)
00103   , outprev_(true)
00104   , exitOnError_(true)
00105   , reasonForFailedState_()
00106   , squidnet_(3128,"https://localhost:8000/RELEASE-NOTES.txt")
00107   , logRing_(logRingSize_)
00108   , logRingIndex_(logRingSize_)
00109   , logWrap_(false)
00110   , nbSubProcesses_(0)
00111   , nbSubProcessesReporting_(0)
00112   , forkInEDM_(true)
00113   , nblive_(0)
00114   , nbdead_(0)
00115   , nbTotalDQM_(0)
00116   , wlReceiving_(0)
00117   , asReceiveMsgAndExecute_(0)
00118   , receiving_(false) 
00119   , wlReceivingMonitor_(0)
00120   , asReceiveMsgAndRead_(0)
00121   , receivingM_(false)
00122   , myProcess_(0)
00123   , wlSupervising_(0)
00124   , asSupervisor_(0)
00125   , supervising_(false)
00126   , wlSignalMonitor_(0)
00127   , asSignalMonitor_(0)
00128   , signalMonitorActive_(false)
00129   , monitorInfoSpace_(0)
00130   , monitorLegendaInfoSpace_(0)
00131   , applicationInfoSpace_(0)
00132   , nbProcessed(0)
00133   , nbAccepted(0)
00134   , scalersInfoSpace_(0)
00135   , scalersLegendaInfoSpace_(0)
00136   , wlScalers_(0)
00137   , asScalers_(0)
00138   , wlScalersActive_(false)
00139   , scalersUpdates_(0)
00140   , wlSummarize_(0)
00141   , asSummarize_(0)
00142   , wlSummarizeActive_(false)
00143   , superSleepSec_(1)
00144   , iDieUrl_("none")
00145   , vulture_(0)
00146   , vp_(0)
00147   , cpustat_(0)
00148   , ratestat_(0)
00149   , mwrRef_(nullptr)
00150   , sorRef_(nullptr)
00151   , master_message_prg_(0,MSQM_MESSAGE_TYPE_PRG)
00152   , master_message_prr_(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_PRR)
00153   , slave_message_prr_(sizeof(prg),MSQS_MESSAGE_TYPE_PRR)
00154   , master_message_trr_(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_TRR)
00155   , edm_init_done_(true)
00156   , crashesThisRun_(false)
00157   , rlimit_coresize_changed_(false)
00158   , crashesToDump_(2)
00159   , sigmon_sem_(0)
00160   , datasetCounting_(true)
00161 {
00162   using namespace utils;
00163 
00164   FUInstancePtr_=this;
00165 
00166   names_.push_back("nbProcessed"    );
00167   names_.push_back("nbAccepted"     );
00168   names_.push_back("epMacroStateInt");
00169   names_.push_back("epMicroStateInt");
00170   // create pipe for web communication
00171   int retpipe = pipe(anonymousPipe_);
00172   if(retpipe != 0)
00173         LOG4CPLUS_ERROR(getApplicationLogger(),"Failed to create pipe");
00174   // check squid presence
00175   squidPresent_ = squidnet_.check();
00176   //pass application parameters to FWEPWrapper
00177   evtProcessor_.setAppDesc(getApplicationDescriptor());
00178   evtProcessor_.setAppCtxt(getApplicationContext());
00179   // bind relevant callbacks to finite state machine
00180   fsm_.initialize<evf::FUEventProcessor>(this);
00181   
00182   //set sourceId_
00183   url_ =
00184     getApplicationDescriptor()->getContextDescriptor()->getURL()+"/"+
00185     getApplicationDescriptor()->getURN();
00186   class_   =getApplicationDescriptor()->getClassName();
00187   instance_=getApplicationDescriptor()->getInstance();
00188   sourceId_=class_.toString()+"_"+instance_.toString();
00189   LOG4CPLUS_INFO(getApplicationLogger(),sourceId_ <<" constructor"         );
00190   LOG4CPLUS_INFO(getApplicationLogger(),"CMSSW_BASE:"<<getenv("CMSSW_BASE"));
00191   
00192   getApplicationDescriptor()->setAttribute("icon", "/evf/images/epicon.jpg");
00193   
00194   xdata::InfoSpace *ispace = getApplicationInfoSpace();
00195   applicationInfoSpace_ = ispace;
00196 
00197   // default configuration
00198   ispace->fireItemAvailable("parameterSet",         &configString_                );
00199   ispace->fireItemAvailable("epInitialized",        &epInitialized_               );
00200   ispace->fireItemAvailable("stateName",             fsm_.stateName()             );
00201   ispace->fireItemAvailable("runNumber",            &runNumber_                   );
00202   ispace->fireItemAvailable("outputEnabled",        &outPut_                      );
00203 
00204   ispace->fireItemAvailable("hasSharedMemory",      &hasShMem_);
00205   ispace->fireItemAvailable("hasPrescaleService",   &hasPrescaleService_          );
00206   ispace->fireItemAvailable("hasModuleWebRegistry", &hasModuleWebRegistry_        );
00207   ispace->fireItemAvailable("hasServiceWebRegistry", &hasServiceWebRegistry_      );
00208   ispace->fireItemAvailable("isRunNumberSetter",    &isRunNumberSetter_           );
00209   ispace->fireItemAvailable("iDieStatisticsGathering",   &iDieStatisticsGathering_);
00210   ispace->fireItemAvailable("rcmsStateListener",     fsm_.rcmsStateListener()     );
00211   ispace->fireItemAvailable("foundRcmsStateListener",fsm_.foundRcmsStateListener());
00212   ispace->fireItemAvailable("nbSubProcesses",       &nbSubProcesses_              );
00213   ispace->fireItemAvailable("nbSubProcessesReporting",&nbSubProcessesReporting_   );
00214   ispace->fireItemAvailable("forkInEDM"             ,&forkInEDM_                  );
00215   ispace->fireItemAvailable("superSleepSec",        &superSleepSec_               );
00216   ispace->fireItemAvailable("autoRestartSlaves",    &autoRestartSlaves_           );
00217   ispace->fireItemAvailable("slaveRestartDelaySecs",&slaveRestartDelaySecs_       );
00218   ispace->fireItemAvailable("iDieUrl",              &iDieUrl_                     );
00219   ispace->fireItemAvailable("crashesToDump"         ,&crashesToDump_              );
00220   ispace->fireItemAvailable("datasetCounting"       ,&datasetCounting_            );
00221 
00222   // Add infospace listeners for exporting data values
00223   getApplicationInfoSpace()->addItemChangedListener("parameterSet",        this);
00224   getApplicationInfoSpace()->addItemChangedListener("outputEnabled",       this);
00225 
00226   // findRcmsStateListener
00227   fsm_.findRcmsStateListener();
00228   
00229   // initialize monitoring infospace
00230 
00231   std::string monInfoSpaceName="evf-eventprocessor-status-monitor";
00232   toolbox::net::URN urn = this->createQualifiedInfoSpace(monInfoSpaceName);
00233   monitorInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
00234 
00235   std::string monLegendaInfoSpaceName="evf-eventprocessor-status-legenda";
00236   urn = this->createQualifiedInfoSpace(monLegendaInfoSpaceName);
00237   monitorLegendaInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
00238 
00239   
00240   monitorInfoSpace_->fireItemAvailable("url",                      &url_            );
00241   monitorInfoSpace_->fireItemAvailable("class",                    &class_          );
00242   monitorInfoSpace_->fireItemAvailable("instance",                 &instance_       );
00243   monitorInfoSpace_->fireItemAvailable("runNumber",                &runNumber_      );
00244   monitorInfoSpace_->fireItemAvailable("stateName",                 fsm_.stateName()); 
00245 
00246   monitorInfoSpace_->fireItemAvailable("squidPresent",             &squidPresent_   );
00247 
00248   std::string scalersInfoSpaceName="evf-eventprocessor-scalers-monitor";
00249   urn = this->createQualifiedInfoSpace(scalersInfoSpaceName);
00250   scalersInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
00251 
00252   std::string scalersLegendaInfoSpaceName="evf-eventprocessor-scalers-legenda";
00253   urn = this->createQualifiedInfoSpace(scalersLegendaInfoSpaceName);
00254   scalersLegendaInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
00255 
00256 
00257 
00258   evtProcessor_.setScalersInfoSpace(scalersInfoSpace_,scalersLegendaInfoSpace_);
00259   scalersInfoSpace_->fireItemAvailable("instance", &instance_);
00260 
00261   evtProcessor_.setApplicationInfoSpace(ispace);
00262   evtProcessor_.setMonitorInfoSpace(monitorInfoSpace_,monitorLegendaInfoSpace_);
00263   evtProcessor_.publishConfigAndMonitorItems(nbSubProcesses_.value_!=0);
00264 
00265   //subprocess state vectors for MP
00266   monitorInfoSpace_->fireItemAvailable("epMacroStateInt",             &spMStates_); 
00267   monitorInfoSpace_->fireItemAvailable("epMicroStateInt",             &spmStates_); 
00268   
00269   // Bind web interface
00270   xgi::bind(this, &FUEventProcessor::css,              "styles.css");
00271   xgi::bind(this, &FUEventProcessor::defaultWebPage,   "Default"   );
00272   xgi::bind(this, &FUEventProcessor::spotlightWebPage, "Spotlight" );
00273   xgi::bind(this, &FUEventProcessor::scalersWeb,       "scalersWeb");
00274   xgi::bind(this, &FUEventProcessor::pathNames,        "pathNames" );
00275   xgi::bind(this, &FUEventProcessor::subWeb,           "SubWeb"    );
00276   xgi::bind(this, &FUEventProcessor::getSlavePids,     "getSlavePids");
00277   xgi::bind(this, &FUEventProcessor::moduleWeb,        "moduleWeb" );
00278   xgi::bind(this, &FUEventProcessor::serviceWeb,       "serviceWeb");
00279   xgi::bind(this, &FUEventProcessor::microState,       "microState");
00280   xgi::bind(this, &FUEventProcessor::updater,          "updater"   );
00281   xgi::bind(this, &FUEventProcessor::procStat,         "procStat"  );
00282 
00283   // instantiate the plugin manager, not referenced here after!
00284 
00285   edm::AssertHandler ah;
00286 
00287   //create Message Service thread and pass ownership to auto_ptr that we destroy before fork
00288   try{
00289     LOG4CPLUS_DEBUG(getApplicationLogger(),
00290                     "Trying to create message service presence ");
00291     edm::PresenceFactory *pf = edm::PresenceFactory::get();
00292     if(pf != 0) {
00293       messageServicePresence_= pf->makePresence("MessageServicePresence");
00294     }
00295     else {
00296       LOG4CPLUS_ERROR(getApplicationLogger(),
00297                       "Unable to create message service presence ");
00298     }
00299   } 
00300   catch(...) {
00301     LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception");
00302   }
00303   ML::MLlog4cplus::setAppl(this);      
00304 
00305   typedef std::set<xdaq::ApplicationDescriptor*> AppDescSet_t;
00306   typedef AppDescSet_t::iterator                 AppDescIter_t;
00307   
00308   AppDescSet_t rcms=
00309     getApplicationContext()->getDefaultZone()->
00310     getApplicationDescriptors("RCMSStateListener");
00311   if(rcms.size()==0) 
00312     {
00313       LOG4CPLUS_WARN(getApplicationLogger(),
00314                        "MonitorReceiver not found, perhaphs it has not been defined ? Scalers updater wl will bail out!");
00315       //        localLog("-W- MonitorReceiver not found, perhaphs it has not been defined ? Scalers updater wl will bail out!");
00316     }
00317   else
00318     {
00319       AppDescIter_t it = rcms.begin();
00320       evtProcessor_.setRcms(*it);
00321     }
00322   pthread_mutex_init(&start_lock_,0);
00323   pthread_mutex_init(&stop_lock_,0);
00324   pthread_mutex_init(&pickup_lock_,0);
00325 
00326   forkInfoObj_=nullptr;
00327   pthread_mutex_init(&forkObjLock_,0);
00328   makeStaticInfo();
00329   startSupervisorLoop();  
00330 
00331   if(vulture_==0) vulture_ = new Vulture(true);
00332 
00334 
00335   AppDescSet_t setOfiDie=
00336     getApplicationContext()->getDefaultZone()->
00337     getApplicationDescriptors("evf::iDie");
00338   
00339   for (AppDescIter_t it=setOfiDie.begin();it!=setOfiDie.end();++it)
00340     if ((*it)->getInstance()==0) // there has to be only one instance of iDie
00341       iDieUrl_ = (*it)->getContextDescriptor()->getURL() + "/" + (*it)->getURN();
00342 
00343   //save default core file size
00344   getrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
00345 
00346   //prepare IPC semaphore for getting the workloop waked up on signal caught in slaves
00347   #ifdef linux
00348   if (sigmon_sem_==0) {
00349     sigmon_sem_ = (sem_t*)mmap(NULL, sizeof(sem_t),
00350         PROT_READ | PROT_WRITE,
00351         MAP_ANONYMOUS | MAP_SHARED, 0, 0);
00352     if (!sigmon_sem_) {
00353       perror("mmap error\n");
00354       std::cout << "mmap error"<<std::endl;
00355     }
00356     else
00357       sem_init(sigmon_sem_,true,0);
00358   }
00359   #endif
00360 }
00361 //___________here ends the *huge* constructor___________________________________
00362 
00363 
00364 //______________________________________________________________________________
00365 FUEventProcessor::~FUEventProcessor()
00366 {
00367   // no longer needed since the wrapper is a member of the class and one can rely on 
00368   // implicit destructor - to be revised - at any rate the most common exit path is via "kill"...
00369   //  if (evtProcessor_) delete evtProcessor_;
00370   if(vulture_ != 0) delete vulture_;
00371 }
00372 
00373 
00375 // implementation of member functions
00377 
00378 
00379 //______________________________________________________________________________
00380 bool FUEventProcessor::configuring(toolbox::task::WorkLoop* wl)
00381 {
00382 //   std::cout << "values " << ((nbSubProcesses_.value_!=0) ? 0x10 : 0) << " "
00383 //          << ((instance_.value_==0) ? 0x8 : 0) << " "
00384 //          << (hasServiceWebRegistry_.value_ ? 0x4 : 0) << " "
00385 //          << (hasModuleWebRegistry_.value_ ? 0x2 : 0) << " "
00386 //          << (hasPrescaleService_.value_ ? 0x1 : 0) <<std::endl;
00387   unsigned short smap 
00388     = (datasetCounting_.value_ ? 0x20 : 0 )
00389     + ((nbSubProcesses_.value_!=0) ? 0x10 : 0)
00390     + (((instance_.value_%80)==0) ? 0x8 : 0) // have at least one legend per slice
00391     + (hasServiceWebRegistry_.value_ ? 0x4 : 0) 
00392     + (hasModuleWebRegistry_.value_ ? 0x2 : 0) 
00393     + (hasPrescaleService_.value_ ? 0x1 : 0);
00394   if(nbSubProcesses_.value_==0) 
00395     {
00396       spMStates_.setSize(1); 
00397       spmStates_.setSize(1); 
00398     }
00399   else
00400     {
00401       spMStates_.setSize(nbSubProcesses_.value_);
00402       spmStates_.setSize(nbSubProcesses_.value_);
00403       for(unsigned int i = 0; i < spMStates_.size(); i++)
00404         {
00405           spMStates_[i] = edm::event_processor::sInit; 
00406           spmStates_[i] = 0; 
00407         }
00408     }
00409   try {
00410     LOG4CPLUS_INFO(getApplicationLogger(),"Start configuring ...");
00411     std::string cfg = configString_.toString(); evtProcessor_.init(smap,cfg);
00412     epInitialized_=true;
00413     if(evtProcessor_)
00414       {
00415         //get ref of mwr
00416         mwrRef_ = evtProcessor_.getModuleWebRegistry();
00417         sorRef_ = evtProcessor_.getShmOutputModuleRegistry();
00418         // moved to wrapper class
00419         configuration_ = evtProcessor_.configuration();
00420         if(nbSubProcesses_.value_==0) evtProcessor_.startMonitoringWorkLoop(); 
00421         evtProcessor_->beginJob(); 
00422         if(cpustat_) {delete cpustat_; cpustat_=0;}
00423         cpustat_ = new CPUStat(evtProcessor_.getNumberOfMicrostates(),
00424                                nbSubProcesses_.value_,
00425                                instance_.value_,
00426                                iDieUrl_.value_);
00427         if(ratestat_) {delete ratestat_; ratestat_=0;}
00428         ratestat_ = new RateStat(iDieUrl_.value_);
00429         if(iDieStatisticsGathering_.value_)
00430         {
00431           try
00432           {
00433             cpustat_->sendLegenda(evtProcessor_.getmicromap());
00434             xdata::Serializable *legenda = scalersLegendaInfoSpace_->find("scalersLegenda");
00435             if(legenda !=0)
00436             {
00437               std::string slegenda = ((xdata::String*)legenda)->value_;
00438               ratestat_->sendLegenda(slegenda);
00439             }
00440             if (sorRef_ && datasetCounting_.value_)
00441             {
00442               xdata::String dsLegenda =  sorRef_->getDatasetCSV();
00443               if (dsLegenda.value_.size())
00444                 ratestat_->sendAuxLegenda(dsLegenda);
00445             }
00446           }
00447           catch(evf::Exception &e)
00448           {
00449             LOG4CPLUS_INFO(getApplicationLogger(),"coud not send legenda"
00450                 << e.what());
00451           }
00452           catch (xcept::Exception& e) {
00453             LOG4CPLUS_ERROR(getApplicationLogger(),"Failed to get or send legenda."
00454                 << e.what());
00455           }
00456         }
00457 
00458         fsm_.fireEvent("ConfigureDone",this);
00459         LOG4CPLUS_INFO(getApplicationLogger(),"Finished configuring!");
00460         localLog("-I- Configuration completed");
00461 
00462       }
00463   }
00464   catch (xcept::Exception &e) {
00465     reasonForFailedState_ = "configuring FAILED: " + (std::string)e.what();
00466     fsm_.fireFailed(reasonForFailedState_,this);
00467     localLog(reasonForFailedState_);
00468   }
00469   catch(cms::Exception &e) {
00470     reasonForFailedState_ = e.explainSelf();
00471     fsm_.fireFailed(reasonForFailedState_,this);
00472     localLog(reasonForFailedState_);
00473   }    
00474   catch(std::exception &e) {
00475     reasonForFailedState_ = e.what();
00476     fsm_.fireFailed(reasonForFailedState_,this);
00477     localLog(reasonForFailedState_);
00478   }
00479   catch(...) {
00480     fsm_.fireFailed("Unknown Exception",this);
00481   }
00482 
00483   if(vulture_!=0 && vp_ == 0) vp_ = vulture_->makeProcess();
00484 
00485   return false;
00486 }
00487 
00488 
00489 
00490 
00491 //______________________________________________________________________________
00492 bool FUEventProcessor::enabling(toolbox::task::WorkLoop* wl)
00493 {
00494   nbTotalDQM_ = 0;
00495   scalersUpdates_ = 0;
00496   idleProcStats_ = 0;
00497   allProcStats_ = 0;
00498 //   std::cout << "values " << ((nbSubProcesses_.value_!=0) ? 0x10 : 0) << " "
00499 //          << ((instance_.value_==0) ? 0x8 : 0) << " "
00500 //          << (hasServiceWebRegistry_.value_ ? 0x4 : 0) << " "
00501 //          << (hasModuleWebRegistry_.value_ ? 0x2 : 0) << " "
00502 //          << (hasPrescaleService_.value_ ? 0x1 : 0) <<std::endl;
00503   unsigned short smap 
00504     = (datasetCounting_.value_ ? 0x20 : 0 )
00505     + ((nbSubProcesses_.value_!=0) ? 0x10 : 0)
00506     + (((instance_.value_%80)==0) ? 0x8 : 0) // have at least one legend per slice
00507     + (hasServiceWebRegistry_.value_ ? 0x4 : 0) 
00508     + (hasModuleWebRegistry_.value_ ? 0x2 : 0) 
00509     + (hasPrescaleService_.value_ ? 0x1 : 0);
00510 
00511   LOG4CPLUS_INFO(getApplicationLogger(),"Start enabling...");
00512   
00513   //reset core limit size
00514   if (rlimit_coresize_changed_)
00515     setrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
00516   rlimit_coresize_changed_=false;
00517   crashesThisRun_=0;
00518   //recreate signal monitor sem
00519   sem_destroy(sigmon_sem_);
00520   sem_init(sigmon_sem_,true,0);
00521 
00522   if(!epInitialized_){
00523     evtProcessor_.forceInitEventProcessorMaybe();
00524   }
00525   std::string cfg = configString_.toString(); evtProcessor_.init(smap,cfg);
00526   
00527   mwrRef_ = evtProcessor_.getModuleWebRegistry();
00528   sorRef_ = evtProcessor_.getShmOutputModuleRegistry();
00529 
00530   if(!epInitialized_){
00531     evtProcessor_->beginJob(); 
00532     if(cpustat_) {delete cpustat_; cpustat_=0;}
00533     cpustat_ = new CPUStat(evtProcessor_.getNumberOfMicrostates(),
00534                            nbSubProcesses_.value_,
00535                            instance_.value_,
00536                            iDieUrl_.value_);
00537     if(ratestat_) {delete ratestat_; ratestat_=0;}
00538     ratestat_ = new RateStat(iDieUrl_.value_);
00539     if(iDieStatisticsGathering_.value_)
00540     {
00541       try
00542       {
00543         cpustat_->sendLegenda(evtProcessor_.getmicromap());
00544         xdata::Serializable *legenda = scalersLegendaInfoSpace_->find("scalersLegenda");
00545         if(legenda !=0)
00546         {
00547           std::string slegenda = ((xdata::String*)legenda)->value_;
00548           ratestat_->sendLegenda(slegenda);
00549         }
00550         if (sorRef_ && datasetCounting_.value_)
00551         {
00552           xdata::String dsLegenda =  sorRef_->getDatasetCSV();
00553           if (dsLegenda.value_.size())
00554             ratestat_->sendAuxLegenda(dsLegenda);
00555         }
00556       }
00557       catch(evf::Exception &e)
00558       {
00559         LOG4CPLUS_INFO(getApplicationLogger(),"could not send legenda"
00560             << e.what());
00561       }
00562       catch (xcept::Exception& e) {
00563         LOG4CPLUS_ERROR(getApplicationLogger(),"Failed to get or send legenda."
00564             << e.what());
00565       }
00566     }
00567     epInitialized_ = true;
00568   }
00569   configuration_ = evtProcessor_.configuration(); // get it again after init has been carried out...
00570   evtProcessor_.resetLumiSectionReferenceIndex();
00571   //classic appl will return here 
00572   if(nbSubProcesses_.value_==0) return enableClassic();
00573   //protect manipulation of subprocess array
00574   pthread_mutex_lock(&start_lock_);
00575   pthread_mutex_lock(&pickup_lock_);
00576   subs_.clear();
00577   subs_.resize(nbSubProcesses_.value_); // this should not be necessary
00578   pid_t retval = -1;
00579 
00580   for(unsigned int i=0; i<nbSubProcesses_.value_; i++)
00581     {
00582       subs_[i]=SubProcess(i,retval); //this will replace all the scattered variables
00583     }
00584   pthread_mutex_unlock(&pickup_lock_);
00585 
00586   pthread_mutex_unlock(&start_lock_);
00587 
00588   //set expected number of child EP's for the Init message(s) sent to the SM
00589   try {
00590     if (sorRef_) {
00591       unsigned int nbExpectedEPWriters = nbSubProcesses_.value_;
00592       if (nbExpectedEPWriters==0) nbExpectedEPWriters=1;//master instance processing
00593       std::vector<edm::FUShmOutputModule *> shmOutputs = sorRef_->getShmOutputModules();
00594       for (unsigned int i=0;i<shmOutputs.size();i++) {
00595         shmOutputs[i]->setNExpectedEPs(nbExpectedEPWriters);
00596       }
00597     }
00598   }
00599   catch (...)
00600   {
00601     LOG4CPLUS_ERROR(getApplicationLogger(),"Thrown Exception while setting nExpectedEPs in shmOutputs");
00602   }
00603 
00604   //use new method if configured
00605   edm_init_done_=true;
00606   if (forkInEDM_.value_) {
00607     edm_init_done_=false;
00608     enableForkInEDM();
00609   }
00610   else
00611     for(unsigned int i=0; i<nbSubProcesses_.value_; i++)
00612     {
00613       retval = subs_[i].forkNew();
00614       if(retval==0)
00615         {
00616           myProcess_ = &subs_[i];
00617           // dirty hack: delete/recreate global binary semaphore for later use in child
00618           delete toolbox::mem::_s_mutex_ptr_;
00619           toolbox::mem::_s_mutex_ptr_ = new toolbox::BSem(toolbox::BSem::FULL,true);
00620           int retval = pthread_mutex_destroy(&stop_lock_);
00621           if(retval != 0) perror("error");
00622           retval = pthread_mutex_init(&stop_lock_,0);
00623           if(retval != 0) perror("error");
00624           fsm_.disableRcmsStateNotification();
00625           
00626           return enableMPEPSlave();
00627           // the loop is broken in the child 
00628         }
00629     }
00630 
00631   if (forkInEDM_.value_) {
00632 
00633     edm::event_processor::State st;
00634     while (!edm_init_done_) {
00635       usleep(10000);
00636       st = evtProcessor_->getState();
00637       if (st==edm::event_processor::sError || st==edm::event_processor::sInvalid) break;
00638     }
00639     st = evtProcessor_->getState();
00640     //error handling: EP must fork during sRunning
00641     if (st!=edm::event_processor::sRunning) {
00642       reasonForFailedState_ = std::string("Master edm::EventProcessor in state ") + evtProcessor_->stateName(st);
00643       localLog(reasonForFailedState_);
00644       fsm_.fireFailed(reasonForFailedState_,this);
00645       return false;
00646     }
00647 
00648     sleep(1);
00649     startSummarizeWorkLoop();
00650     startSignalMonitorWorkLoop();//only with new forking
00651     vp_ = vulture_->start(iDieUrl_.value_,runNumber_.value_);
00652 
00653     //enable after we are done with conditions loading and forking
00654     LOG4CPLUS_INFO(getApplicationLogger(),"Finished enabling!");
00655     fsm_.fireEvent("EnableDone",this);
00656     localLog("-I- Start completed");
00657     return false;
00658   }
00659 
00660   startSummarizeWorkLoop();
00661   vp_ = vulture_->start(iDieUrl_.value_,runNumber_.value_);
00662   LOG4CPLUS_INFO(getApplicationLogger(),"Finished enabling!");
00663   fsm_.fireEvent("EnableDone",this);
00664   localLog("-I- Start completed");
00665   return false;
00666 }
00667 
00668 bool FUEventProcessor::doEndRunInEDM() {
00669   //taking care that EP in master is in stoppable state
00670   if (forkInfoObj_) {
00671 
00672     int count = 30;
00673     bool waitedForEDM=false;
00674     while (!edm_init_done_ && count) {
00675       ::sleep(1);
00676       if (count%5==0)
00677         LOG4CPLUS_WARN(log_,"MASTER EP: Stopping while EP busy in beginRun. waiting " <<count<< "sec");
00678       count--;
00679       waitedForEDM=true;
00680     }
00681     //sleep a few more seconds it was early stop
00682     if (waitedForEDM) sleep(5);
00683 
00684     //if (count==0) fsm_.fireFailed("failed to stop Master EP",this);
00685 
00686     if (evtProcessor_->getState()==edm::event_processor::sJobReady)
00687       return true;//we are already done
00688 
00689     forkInfoObj_->lock();
00690     forkInfoObj_->stopCondition=true;
00691     sem_post(forkInfoObj_->control_sem_);
00692     forkInfoObj_->unlock();
00693 
00694     count = 30;
00695 
00696     edm::event_processor::State st;
00697     while(count--) {
00698       st = evtProcessor_->getState();
00699       if (st==edm::event_processor::sRunning) ::usleep(100000);
00700       else if (st==edm::event_processor::sStopping || st==edm::event_processor::sJobReady) {
00701         break;
00702       }
00703       else {
00704         std::ostringstream ost;
00705         ost << "Master edm::EventProcessor is in state "<< evtProcessor_->stateName(st) << " while stopping";
00706         LOG4CPLUS_ERROR(getApplicationLogger(),ost.str());
00707         fsm_.fireFailed(ost.str(),this);
00708         return false;
00709       }
00710       if (count%5==0 && st==edm::event_processor::sRunning && !forkInfoObj_->receivedStop_) {
00711         forkInfoObj_->lock();
00712         forkInfoObj_->stopCondition=true;
00713         sem_post(forkInfoObj_->control_sem_);
00714         forkInfoObj_->unlock();
00715         LOG4CPLUS_WARN(getApplicationLogger(),
00716           "Master edm::EventProcessor still running after "<< (30-count-1) << " seconds. \"sem_post\" was executed again" );
00717       }
00718     }
00719     if (count<0) {
00720       std::ostringstream ost;
00721       if (!forkInfoObj_->receivedStop_)
00722         ost << "Timeout waiting for Master edm::EventProcessor to go stopping state "
00723             << evtProcessor_->stateName(st) << ": input source did not receive stop signal!";
00724       else
00725         ost << "Timeout waiting for Master edm::EventProcessor to go stopping state "<<evtProcessor_->stateName(st);
00726       LOG4CPLUS_ERROR(getApplicationLogger(),ost.str());
00727       fsm_.fireFailed(ost.str(),this);
00728       return false;
00729     }
00730   }
00731   return true;
00732 }
00733 
00734 //______________________________________________________________________________
00735 bool FUEventProcessor::stopping(toolbox::task::WorkLoop* wl)
00736 {
00737   setrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
00738   if(nbSubProcesses_.value_!=0) {
00739     stopSlavesAndAcknowledge();
00740     if (forkInEDM_.value_) {
00741             //only in new forking for now
00742             sem_post(sigmon_sem_);
00743             doEndRunInEDM();
00744     }
00745   }
00746   vulture_->stop();
00747 
00748   if (forkInEDM_.value_) {
00749     //shared memory was already disconnected in master
00750     bool tmpHasShMem_=hasShMem_;
00751     hasShMem_=false;
00752     stopClassic();
00753     hasShMem_=tmpHasShMem_;
00754     return false;
00755   }
00756   stopClassic();
00757   return false;
00758 }
00759 
00760 
00761 //______________________________________________________________________________
00762 bool FUEventProcessor::halting(toolbox::task::WorkLoop* wl)
00763 {
00764   LOG4CPLUS_INFO(getApplicationLogger(),"Start halting ...");
00765   setrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
00766   if(nbSubProcesses_.value_!=0) { 
00767     stopSlavesAndAcknowledge();
00768     if (forkInEDM_.value_) {
00769             sem_post(sigmon_sem_);
00770             doEndRunInEDM();
00771     }
00772   }
00773   try{
00774     evtProcessor_.stopAndHalt();
00775     //cleanup forking variables
00776     if (forkInfoObj_) {
00777       delete forkInfoObj_;
00778       forkInfoObj_=0;
00779     }
00780   }
00781   catch (evf::Exception &e) {
00782     reasonForFailedState_ = "halting FAILED: " + (std::string)e.what();
00783     localLog(reasonForFailedState_);
00784     fsm_.fireFailed(reasonForFailedState_,this);
00785   }
00786   //  if(hasShMem_) detachDqmFromShm();
00787 
00788   LOG4CPLUS_INFO(getApplicationLogger(),"Finished halting!");
00789   fsm_.fireEvent("HaltDone",this);
00790 
00791   localLog("-I- Halt completed");
00792   return false;
00793 }
00794 
00795 
00796 //______________________________________________________________________________
00797 xoap::MessageReference FUEventProcessor::fsmCallback(xoap::MessageReference msg)
00798   throw (xoap::exception::Exception)
00799 {
00800   return fsm_.commandCallback(msg);
00801 }
00802 
00803 
00804 //______________________________________________________________________________
00805 void FUEventProcessor::actionPerformed(xdata::Event& e)
00806 {
00807 
00808   if (e.type()=="ItemChangedEvent" && fsm_.stateName()->toString()!="Halted") {
00809     std::string item = dynamic_cast<xdata::ItemChangedEvent&>(e).itemName();
00810     
00811     if ( item == "parameterSet") {
00812       LOG4CPLUS_WARN(getApplicationLogger(),
00813                      "HLT Menu changed, force reinitialization of EventProcessor");
00814       epInitialized_ = false;
00815     }
00816     if ( item == "outputEnabled") {
00817       if(outprev_ != outPut_) {
00818         LOG4CPLUS_WARN(getApplicationLogger(),
00819                        (outprev_ ? "Disabling " : "Enabling ")<<"global output");
00820         evtProcessor_->enableEndPaths(outPut_);
00821         outprev_ = outPut_;
00822       }
00823     }
00824     if (item == "globalInputPrescale") {
00825       LOG4CPLUS_WARN(getApplicationLogger(),
00826                      "Setting global input prescale has no effect "
00827                      <<"in this version of the code");
00828     }
00829     if ( item == "globalOutputPrescale") {
00830       LOG4CPLUS_WARN(getApplicationLogger(),
00831                      "Setting global output prescale has no effect "
00832                      <<"in this version of the code");
00833     }
00834   }
00835   
00836 }
00837 
00838 //______________________________________________________________________________
00839 void FUEventProcessor::getSlavePids(xgi::Input  *in, xgi::Output *out)
00840 {
00841   for (unsigned int i=0;i<subs_.size();i++)
00842   {
00843     if (i!=0) *out << ",";
00844     *out << subs_[i].pid();
00845   }
00846 }
00847 //______________________________________________________________________________
00848 void FUEventProcessor::subWeb(xgi::Input  *in, xgi::Output *out)
00849 {
00850   using namespace cgicc;
00851   pid_t pid = 0;
00852   std::ostringstream ost;
00853   ost << "&";
00854 
00855   Cgicc cgi(in);
00856   internal::MyCgi *mycgi = (internal::MyCgi*)in;
00857   for(std::map<std::string, std::string, std::less<std::string> >::iterator mit = 
00858         mycgi->getEnvironment().begin();
00859       mit != mycgi->getEnvironment().end(); mit++)
00860     ost << mit->first << "%" << mit->second << ";";
00861   std::vector<FormEntry> els = cgi.getElements() ;
00862   std::vector<FormEntry> el1;
00863   cgi.getElement("method",el1);
00864   std::vector<FormEntry> el2;
00865   cgi.getElement("process",el2);
00866   if(el1.size()!=0) {
00867     std::string meth = el1[0].getValue();
00868     if(el2.size()!=0) {
00869       unsigned int i = 0;
00870       std::string mod = el2[0].getValue();
00871       pid = atoi(mod.c_str()); // get the process id to be polled
00872       for(; i < subs_.size(); i++)
00873         if(subs_[i].pid()==pid) break;
00874       if(i>=subs_.size()){ //process was not found, let the browser know
00875         *out << "ERROR 404 : Process " << pid << " Not Found !" << std::endl;
00876         return;
00877       } 
00878       if(subs_[i].alive() != 1){
00879         *out << "ERROR 405 : Process " << pid << " Not Alive !" << std::endl;
00880         return;
00881       }
00882       MsgBuf msg1(meth.length()+ost.str().length()+1,MSQM_MESSAGE_TYPE_WEB);
00883       strncpy(msg1->mtext,meth.c_str(),meth.length());
00884       strncpy(msg1->mtext+meth.length(),ost.str().c_str(),ost.str().length());
00885       subs_[i].post(msg1,true);
00886       unsigned int keep_supersleep_original_value = superSleepSec_.value_;
00887       superSleepSec_.value_=10*keep_supersleep_original_value;
00888       MsgBuf msg(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_WEB);
00889       bool done = false;
00890       std::vector<char *>pieces;
00891       while(!done){
00892         unsigned long retval1 = subs_[i].rcvNonBlocking(msg,true);
00893         if(retval1 == MSGQ_MESSAGE_TYPE_RANGE){
00894 	  ::sleep(1);
00895           continue;
00896         }
00897         unsigned int nbytes = atoi(msg->mtext);
00898         if(nbytes < MAX_PIPE_BUFFER_SIZE) done = true; // this will break the while loop
00899         char *buf= new char[nbytes];
00900         ssize_t retval = read(anonymousPipe_[PIPE_READ],buf,nbytes);
00901         if(retval!=nbytes) std::cout 
00902           << "CAREFUL HERE, read less bytes than expected from pipe in subWeb" << std::endl;
00903         pieces.push_back(buf);
00904       }
00905       superSleepSec_.value_=keep_supersleep_original_value;
00906       for(unsigned int j = 0; j < pieces.size(); j++){
00907         *out<<pieces[j];    // chain the buffers into the output strstream
00908         delete[] pieces[j]; //make sure to release all buffers used for reading the pipe
00909       }
00910     }
00911   }
00912 }
00913 
00914 
00915 //______________________________________________________________________________
00916 void FUEventProcessor::defaultWebPage(xgi::Input  *in, xgi::Output *out)
00917   throw (xgi::exception::Exception)
00918 {
00919 
00920 
00921   *out << "<!DOCTYPE HTML PUBLIC \"-//W3C//DTD HTML 4.0 Transitional//EN\">" 
00922        << "<html><head><title>" << getApplicationDescriptor()->getClassName() << (nbSubProcesses_.value_ > 0 ? "MP " : " ") 
00923        << getApplicationDescriptor()->getInstance() << "</title>"
00924        << "<meta https-equiv=\"REFRESH\" content=\"0;url=/evf/html/defaultBasePage.html\">"
00925        << "</head></html>";
00926 }
00927 
00928 
00929 //______________________________________________________________________________
00930 
00931 
00932 void FUEventProcessor::spotlightWebPage(xgi::Input  *in, xgi::Output *out)
00933   throw (xgi::exception::Exception)
00934 {
00935 
00936   std::string urn = getApplicationDescriptor()->getURN();
00937 
00938   *out << "<!-- base href=\"/" <<  urn
00939        << "\"> -->" << std::endl;
00940   *out << "<html>"                                                   << std::endl;
00941   *out << "<head>"                                                   << std::endl;
00942   *out << "<link type=\"text/css\" rel=\"stylesheet\"";
00943   *out << " href=\"/evf/html/styles.css\"/>"                   << std::endl;
00944   *out << "<title>" << getApplicationDescriptor()->getClassName() 
00945        << getApplicationDescriptor()->getInstance() 
00946        << " MAIN</title>"     << std::endl;
00947   *out << "</head>"                                                  << std::endl;
00948   *out << "<body>"                                                   << std::endl;
00949   *out << "<table border=\"0\" width=\"100%\">"                      << std::endl;
00950   *out << "<tr>"                                                     << std::endl;
00951   *out << "  <td align=\"left\">"                                    << std::endl;
00952   *out << "    <img"                                                 << std::endl;
00953   *out << "     align=\"middle\""                                    << std::endl;
00954   *out << "     src=\"/evf/images/spoticon.jpg\""                            << std::endl;
00955   *out << "     alt=\"main\""                                        << std::endl;
00956   *out << "     width=\"64\""                                        << std::endl;
00957   *out << "     height=\"64\""                                       << std::endl;
00958   *out << "     border=\"\"/>"                                       << std::endl;
00959   *out << "    <b>"                                                  << std::endl;
00960   *out << getApplicationDescriptor()->getClassName() 
00961        << getApplicationDescriptor()->getInstance()                  << std::endl;
00962   *out << "      " << fsm_.stateName()->toString()                   << std::endl;
00963   *out << "    </b>"                                                 << std::endl;
00964   *out << "  </td>"                                                  << std::endl;
00965   *out << "  <td width=\"32\">"                                      << std::endl;
00966   *out << "    <a href=\"/urn:xdaq-application:lid=3\">"             << std::endl;
00967   *out << "      <img"                                               << std::endl;
00968   *out << "       align=\"middle\""                                  << std::endl;
00969   *out << "       src=\"/hyperdaq/images/HyperDAQ.jpg\""             << std::endl;
00970   *out << "       alt=\"HyperDAQ\""                                  << std::endl;
00971   *out << "       width=\"32\""                                      << std::endl;
00972   *out << "       height=\"32\""                                     << std::endl;
00973   *out << "       border=\"\"/>"                                     << std::endl;
00974   *out << "    </a>"                                                 << std::endl;
00975   *out << "  </td>"                                                  << std::endl;
00976   *out << "  <td width=\"32\">"                                      << std::endl;
00977   *out << "  </td>"                                                  << std::endl;
00978   *out << "  <td width=\"32\">"                                      << std::endl;
00979   *out << "    <a href=\"/" << urn << "/\">"                         << std::endl;
00980   *out << "      <img"                                               << std::endl;
00981   *out << "       align=\"middle\""                                  << std::endl;
00982   *out << "       src=\"/evf/images/epicon.jpg\""                    << std::endl;
00983   *out << "       alt=\"main\""                                      << std::endl;
00984   *out << "       width=\"32\""                                      << std::endl;
00985   *out << "       height=\"32\""                                     << std::endl;
00986   *out << "       border=\"\"/>"                                     << std::endl;
00987   *out << "    </a>"                                                 << std::endl;
00988   *out << "  </td>"                                                  << std::endl;
00989   *out << "</tr>"                                                    << std::endl;
00990   *out << "</table>"                                                 << std::endl;
00991 
00992   *out << "<hr/>"                                                    << std::endl;
00993   
00994   std::ostringstream ost;
00995   if(myProcess_) 
00996     ost << "/SubWeb?process=" << getpid() << "&method=moduleWeb&";
00997   else
00998     ost << "/moduleWeb?";
00999   urn += ost.str();
01000   if(evtProcessor_ && (myProcess_ || nbSubProcesses_.value_==0))
01001     evtProcessor_.taskWebPage(in,out,urn);
01002   else if(evtProcessor_)
01003     evtProcessor_.summaryWebPage(in,out,urn);
01004   else
01005     *out << "<td>HLT Unconfigured</td>" << std::endl;
01006   *out << "</table>"                                                 << std::endl;
01007   
01008   *out << "<br><textarea rows=" << 10 << " cols=80 scroll=yes>"      << std::endl;
01009   *out << configuration_                                             << std::endl;
01010   *out << "</textarea><P>"                                           << std::endl;
01011   
01012   *out << "</body>"                                                  << std::endl;
01013   *out << "</html>"                                                  << std::endl;
01014 
01015 
01016 }
01017 void FUEventProcessor::scalersWeb(xgi::Input  *in, xgi::Output *out)
01018   throw (xgi::exception::Exception)
01019 {
01020 
01021   out->getHTTPResponseHeader().addHeader( "Content-Type",
01022                                           "application/octet-stream" );
01023   out->getHTTPResponseHeader().addHeader( "Content-Transfer-Encoding",
01024                                           "binary" );
01025   if(evtProcessor_ != 0){
01026     out->write( (char*)(evtProcessor_.getPackedTriggerReportAsStruct()), sizeof(TriggerReportStatic));
01027   }
01028 }
01029 
01030 void FUEventProcessor::pathNames(xgi::Input  *in, xgi::Output *out)
01031   throw (xgi::exception::Exception)
01032 {
01033 
01034   if(evtProcessor_ != 0){
01035     xdata::Serializable *legenda = scalersLegendaInfoSpace_->find("scalersLegenda");
01036     if(legenda !=0){
01037       std::string slegenda = ((xdata::String*)legenda)->value_;
01038       *out << slegenda << std::endl;
01039     }
01040   }
01041 }
01042 
01043 
01044 void FUEventProcessor::setAttachDqmToShm() throw (evf::Exception)  
01045 {
01046   std::string errmsg;
01047   try {
01048     edm::ServiceRegistry::Operate operate(evtProcessor_->getToken());
01049     if(edm::Service<FUShmDQMOutputService>().isAvailable())
01050       edm::Service<FUShmDQMOutputService>()->setAttachToShm();
01051   }
01052   catch (cms::Exception& e) {
01053     errmsg = "Failed to set to attach DQM service to shared memory: " + (std::string)e.what();
01054   }
01055   if (!errmsg.empty()) XCEPT_RAISE(evf::Exception,errmsg);
01056 }
01057 
01058 
01059 void FUEventProcessor::attachDqmToShm() throw (evf::Exception)  
01060 {
01061   std::string errmsg;
01062   bool success = false;
01063   try {
01064     edm::ServiceRegistry::Operate operate(evtProcessor_->getToken());
01065     if(edm::Service<FUShmDQMOutputService>().isAvailable())
01066       success = edm::Service<FUShmDQMOutputService>()->attachToShm();
01067     if (!success) errmsg = "Failed to attach DQM service to shared memory";
01068   }
01069   catch (cms::Exception& e) {
01070     errmsg = "Failed to attach DQM service to shared memory: " + (std::string)e.what();
01071   }
01072   if (!errmsg.empty()) XCEPT_RAISE(evf::Exception,errmsg);
01073 }
01074 
01075 
01076 
01077 void FUEventProcessor::detachDqmFromShm() throw (evf::Exception)
01078 {
01079   std::string errmsg;
01080   bool success = false;
01081   try {
01082     edm::ServiceRegistry::Operate operate(evtProcessor_->getToken());
01083     if(edm::Service<FUShmDQMOutputService>().isAvailable())
01084       success = edm::Service<FUShmDQMOutputService>()->detachFromShm();
01085     if (!success) errmsg = "Failed to detach DQM service from shared memory";
01086   }
01087   catch (cms::Exception& e) {
01088     errmsg = "Failed to detach DQM service from shared memory: " + (std::string)e.what();
01089   }
01090   if (!errmsg.empty()) XCEPT_RAISE(evf::Exception,errmsg);
01091 }
01092 
01093 
01094 std::string FUEventProcessor::logsAsString()
01095 {
01096   std::ostringstream oss;
01097   if(logWrap_)
01098     {
01099       for(unsigned int i = logRingIndex_; i < logRing_.size(); i++)
01100         oss << logRing_[i] << std::endl;
01101       for(unsigned int i = 0; i <  logRingIndex_; i++)
01102         oss << logRing_[i] << std::endl;
01103     }
01104   else
01105       for(unsigned int i = logRingIndex_; i < logRing_.size(); i++)
01106         oss << logRing_[i] << std::endl;
01107     
01108   return oss.str();
01109 }
01110   
01111 void FUEventProcessor::localLog(std::string m)
01112 {
01113   timeval tv;
01114 
01115   gettimeofday(&tv,0);
01116   tm *uptm = localtime(&tv.tv_sec);
01117   char datestring[256];
01118   strftime(datestring, sizeof(datestring),"%c", uptm);
01119 
01120   if(logRingIndex_ == 0){logWrap_ = true; logRingIndex_ = logRingSize_;}
01121   logRingIndex_--;
01122   std::ostringstream timestamp;
01123   timestamp << " at " << datestring;
01124   m += timestamp.str();
01125   logRing_[logRingIndex_] = m;
01126 }
01127 
01128 void FUEventProcessor::startSupervisorLoop()
01129 {
01130   try {
01131     wlSupervising_=
01132       toolbox::task::getWorkLoopFactory()->getWorkLoop("Supervisor",
01133                                                        "waiting");
01134     if (!wlSupervising_->isActive()) wlSupervising_->activate();
01135     asSupervisor_ = toolbox::task::bind(this,&FUEventProcessor::supervisor,
01136                                         "Supervisor");
01137     wlSupervising_->submit(asSupervisor_);
01138     supervising_ = true;
01139   }
01140   catch (xcept::Exception& e) {
01141     std::string msg = "Failed to start workloop 'Supervisor'.";
01142     XCEPT_RETHROW(evf::Exception,msg,e);
01143   }
01144 }
01145 
01146 void FUEventProcessor::startReceivingLoop()
01147 {
01148   try {
01149     wlReceiving_=
01150       toolbox::task::getWorkLoopFactory()->getWorkLoop("Receiving",
01151                                                        "waiting");
01152     if (!wlReceiving_->isActive()) wlReceiving_->activate();
01153     asReceiveMsgAndExecute_ = toolbox::task::bind(this,&FUEventProcessor::receiving,
01154                                         "Receiving");
01155     wlReceiving_->submit(asReceiveMsgAndExecute_);
01156     receiving_ = true;
01157   }
01158   catch (xcept::Exception& e) {
01159     std::string msg = "Failed to start workloop 'Receiving'.";
01160     XCEPT_RETHROW(evf::Exception,msg,e);
01161   }
01162 }
01163 void FUEventProcessor::startReceivingMonitorLoop()
01164 {
01165   try {
01166     wlReceivingMonitor_=
01167       toolbox::task::getWorkLoopFactory()->getWorkLoop("ReceivingM",
01168                                                        "waiting");
01169     if (!wlReceivingMonitor_->isActive()) 
01170       wlReceivingMonitor_->activate();
01171     asReceiveMsgAndRead_ = 
01172       toolbox::task::bind(this,&FUEventProcessor::receivingAndMonitor,
01173                           "ReceivingM");
01174     wlReceivingMonitor_->submit(asReceiveMsgAndRead_);
01175     receivingM_ = true;
01176   }
01177   catch (xcept::Exception& e) {
01178     std::string msg = "Failed to start workloop 'ReceivingM'.";
01179     XCEPT_RETHROW(evf::Exception,msg,e);
01180   }
01181 }
01182 
01183 bool FUEventProcessor::receiving(toolbox::task::WorkLoop *)
01184 {
01185   MsgBuf msg;
01186   try{
01187     myProcess_->rcvSlave(msg,false); //will receive only messages from Master
01188     if(msg->mtype==MSQM_MESSAGE_TYPE_RLI)
01189       {
01190         rlimit rl;
01191         getrlimit(RLIMIT_CORE,&rl);
01192         rl.rlim_cur=0;
01193         setrlimit(RLIMIT_CORE,&rl);
01194         rlimit_coresize_changed_=true;
01195       }
01196     if (msg->mtype==MSQM_MESSAGE_TYPE_RLR)
01197       {
01198         //reset coresize limit
01199         setrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
01200         rlimit_coresize_changed_=false;
01201       }
01202     if(msg->mtype==MSQM_MESSAGE_TYPE_STOP)
01203       {
01204         pthread_mutex_lock(&stop_lock_);
01205         try {
01206           fsm_.fireEvent("Stop",this); // need to set state in fsm first to allow stopDone transition
01207         }
01208         catch (...) {
01209           LOG4CPLUS_ERROR(getApplicationLogger(),"Failed to go to Stopping state in slave EP, pid "
01210                                                  << getpid() << " The state on Stop event was not consistent");
01211         }
01212 
01213         try {
01214           stopClassic(); // call the normal sequence of stopping - as this is allowed to fail provisions must be made ...@@@EM
01215         }
01216         catch (...) {
01217           LOG4CPLUS_ERROR(getApplicationLogger(),"Slave EP 'receiving' workloop: exception " << getpid());
01218         }
01219 
01220         //destroy MessageService thread before exit     
01221         try{
01222           messageServicePresence_.reset();
01223         }
01224         catch(...) {
01225           LOG4CPLUS_ERROR(getApplicationLogger(),"SLAVE:Unable to destroy MessageServicePresence. pid:" << getpid() );
01226         }
01227 
01228         MsgBuf msg1(0,MSQS_MESSAGE_TYPE_STOP);
01229         myProcess_->postSlave(msg1,false);
01230         pthread_mutex_unlock(&stop_lock_);
01231         fclose(stdout);
01232         fclose(stderr);
01233         _exit(EXIT_SUCCESS);
01234       }
01235     if(msg->mtype==MSQM_MESSAGE_TYPE_FSTOP)
01236       _exit(EXIT_SUCCESS);
01237   }
01238   catch(evf::Exception &e){
01239     LOG4CPLUS_ERROR(getApplicationLogger(),"Slave EP pid:" << getpid() << " receiving WorkLoop exception: "<<e.what());
01240   }
01241   return true;
01242 }
01243 
01244 bool FUEventProcessor::supervisor(toolbox::task::WorkLoop *)
01245 {
01246   pthread_mutex_lock(&stop_lock_);
01247   if(subs_.size()!=nbSubProcesses_.value_)
01248     {
01249       pthread_mutex_lock(&pickup_lock_);
01250       if(subs_.size()!=nbSubProcesses_.value_) {
01251         subs_.resize(nbSubProcesses_.value_);
01252         spMStates_.resize(nbSubProcesses_.value_);
01253         spmStates_.resize(nbSubProcesses_.value_);
01254         for(unsigned int i = 0; i < spMStates_.size(); i++)
01255           {
01256             spMStates_[i] = edm::event_processor::sInit; 
01257             spmStates_[i] = 0; 
01258           }
01259       }
01260       pthread_mutex_unlock(&pickup_lock_);
01261     }
01262   bool running = fsm_.stateName()->toString()=="Enabled";
01263   bool stopping = fsm_.stateName()->toString()=="stopping";
01264   for(unsigned int i = 0; i < subs_.size(); i++)
01265     {
01266       if(subs_[i].alive()==-1000) continue;
01267       int sl;
01268       pid_t sub_pid = subs_[i].pid();
01269       pid_t killedOrNot = waitpid(sub_pid,&sl,WNOHANG);
01270 
01271       if(killedOrNot && killedOrNot==sub_pid) {
01272         pthread_mutex_lock(&pickup_lock_);
01273         //check if out of range or recreated (enable can clear vector)
01274         if (i<subs_.size() && subs_[i].alive()!=-1000) {
01275           subs_[i].setStatus((WIFEXITED(sl) != 0 ? 0 : -1));
01276           std::ostringstream ost;
01277           if(subs_[i].alive()==0) ost << " process exited with status " << WEXITSTATUS(sl);
01278           else if(WIFSIGNALED(sl)!=0) {
01279             ost << " process terminated with signal " << WTERMSIG(sl);
01280           }
01281           else ost << " process stopped ";
01282           //report unexpected slave exit in stop
01283           //if (stopping && (WEXITSTATUS(sl)!=0 || WIFSIGNALED(sl)!=0)) {
01284           //  LOG4CPLUS_WARN(getApplicationLogger(),ost.str() << ", slave pid:"<<getpid());
01285           //}
01286           subs_[i].countdown()=slaveRestartDelaySecs_.value_;
01287           subs_[i].setReasonForFailed(ost.str());
01288           spMStates_[i] = evtProcessor_.notstarted_state_code();
01289           spmStates_[i] = 0;
01290           std::ostringstream ost1;
01291           ost1 << "-E- Slave " << subs_[i].pid() << ost.str();
01292           localLog(ost1.str());
01293           if(!autoRestartSlaves_.value_) subs_[i].disconnect();
01294         }
01295         pthread_mutex_unlock(&pickup_lock_);
01296       }
01297     }
01298   pthread_mutex_unlock(&stop_lock_);    
01299   if(stopping) return true; // if in stopping we are done
01300 
01301   // check if we need to reset core dumps (15 min after last one)
01302   if (running && rlimit_coresize_changed_) {
01303     timeval newtv;
01304     gettimeofday(&newtv,0);
01305     int delta = newtv.tv_sec-lastCrashTime_.tv_sec;
01306     if (delta>60*15) {
01307       std::ostringstream ostr;
01308       ostr << " No more slave EP crashes on this machine in last 15 min. resetting core size limits";
01309       std::cout << ostr.str() << std::endl;
01310       LOG4CPLUS_INFO(getApplicationLogger(),ostr.str());
01311       setrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
01312       MsgBuf master_message_rlr_(NUMERIC_MESSAGE_SIZE,MSQM_MESSAGE_TYPE_RLR);
01313       for (unsigned int i = 0; i < subs_.size(); i++) {
01314         try {
01315           if (subs_[i].alive())
01316             subs_[i].post(master_message_rlr_,false);
01317         }
01318         catch (...) {}
01319       }
01320       rlimit_coresize_changed_=false;
01321       crashesThisRun_=0;
01322     }
01323   }
01324 
01325   if(running && edm_init_done_)
01326     {
01327       // if enabled, this loop will periodically check if dead slaves countdown has expired and restart them
01328       // this is only active while running, hence, the stop lock is acquired and only released at end of loop
01329       if(autoRestartSlaves_.value_){
01330         pthread_mutex_lock(&stop_lock_); //lockout slave killing at stop while you check for restarts
01331         for(unsigned int i = 0; i < subs_.size(); i++)
01332           {
01333             if(subs_[i].alive() != 1){
01334               if(subs_[i].countdown() == 0)
01335                 {
01336                   if(subs_[i].restartCount()>2){
01337                     LOG4CPLUS_WARN(getApplicationLogger()," Not restarting subprocess in slot " << i 
01338                                    << " - maximum restart count reached");
01339                     std::ostringstream ost1;
01340                     ost1 << "-W- Dead Process in slot " << i << " reached maximum restart count"; 
01341                     localLog(ost1.str());
01342                     subs_[i].countdown()--;
01343                     XCEPT_DECLARE(evf::Exception,
01344                                   sentinelException, ost1.str());
01345                     notifyQualified("error",sentinelException);
01346                     subs_[i].disconnect();
01347                     continue;
01348                   }
01349                   subs_[i].restartCount()++;
01350                   if (forkInEDM_.value_) {
01351                     restartForkInEDM(i);
01352                   }
01353                   else {
01354                     pid_t rr = subs_[i].forkNew();
01355                     if(rr==0)
01356                     {
01357                       myProcess_=&subs_[i];
01358                       scalersUpdates_ = 0;
01359                       int retval = pthread_mutex_destroy(&stop_lock_);
01360                       if(retval != 0) perror("error");
01361                       retval = pthread_mutex_init(&stop_lock_,0);
01362                       if(retval != 0) perror("error");
01363                       fsm_.disableRcmsStateNotification();
01364                       fsm_.fireEvent("Stop",this); // need to set state in fsm first to allow stopDone transition
01365                       fsm_.fireEvent("StopDone",this); // need to set state in fsm first to allow stopDone transition
01366                       fsm_.fireEvent("Enable",this); // need to set state in fsm first to allow stopDone transition
01367                       try{
01368                         xdata::Serializable *lsid = applicationInfoSpace_->find("lumiSectionIndex");
01369                         if(lsid) {
01370                           ((xdata::UnsignedInteger32*)(lsid))->value_--; // need to reset to value before end of ls in which process died
01371                         }
01372                       }
01373                       catch(...){
01374                         std::cout << "trouble with lsindex during restart" << std::endl;
01375                       }
01376                       try{
01377                         xdata::Serializable *lstb = applicationInfoSpace_->find("lsToBeRecovered");
01378                         if(lstb) {
01379                           ((xdata::Boolean*)(lstb))->value_ = false; // do not issue eol/bol for all Ls when restarting
01380                         }
01381                       }
01382                       catch(...){
01383                         std::cout << "trouble with resetting flag for eol recovery " << std::endl;
01384                       }
01385 
01386                       evtProcessor_.adjustLsIndexForRestart();
01387                       evtProcessor_.resetPackedTriggerReport();
01388                       enableMPEPSlave();
01389                       return false; // exit the supervisor loop immediately in the child !!!
01390                     }
01391                   else
01392                     {
01393                       std::ostringstream ost1;
01394                       ost1 << "-I- New Process " << rr << " forked for slot " << i; 
01395                       localLog(ost1.str());
01396                     }
01397                   }
01398                 }
01399               if(subs_[i].countdown()>=0) subs_[i].countdown()--;
01400             }
01401           }
01402         pthread_mutex_unlock(&stop_lock_);
01403       } // finished handling replacement of dead slaves once they've been reaped
01404     }
01405   xdata::Serializable *lsid = 0; 
01406   xdata::Serializable *psid = 0;
01407   xdata::Serializable *dqmp = 0;
01408   xdata::UnsignedInteger32 *dqm = 0;
01409 
01410 
01411   
01412   if(running && edm_init_done_){  
01413     try{
01414       lsid = applicationInfoSpace_->find("lumiSectionIndex");
01415       psid = applicationInfoSpace_->find("prescaleSetIndex");
01416       nbProcessed = monitorInfoSpace_->find("nbProcessed");
01417       nbAccepted  = monitorInfoSpace_->find("nbAccepted");
01418       dqmp = applicationInfoSpace_-> find("nbDqmUpdates");      
01419     }
01420     catch(xdata::exception::Exception e){
01421       LOG4CPLUS_INFO(getApplicationLogger(),"could not retrieve some data - " << e.what());    
01422     }
01423 
01424     try{
01425       if(nbProcessed !=0 && nbAccepted !=0)
01426         {
01427           xdata::UnsignedInteger32*nbp = ((xdata::UnsignedInteger32*)nbProcessed);
01428           xdata::UnsignedInteger32*nba = ((xdata::UnsignedInteger32*)nbAccepted);
01429           xdata::UnsignedInteger32*ls  = ((xdata::UnsignedInteger32*)lsid);
01430           xdata::UnsignedInteger32*ps  = ((xdata::UnsignedInteger32*)psid);
01431           if(dqmp!=0)
01432             dqm = (xdata::UnsignedInteger32*)dqmp;
01433           if(dqm) dqm->value_ = 0;
01434           nbTotalDQM_ = 0;
01435           nbp->value_ = 0;
01436           nba->value_ = 0;
01437           nblive_ = 0;
01438           nbdead_ = 0;
01439           scalersUpdates_ = 0;
01440 
01441           for(unsigned int i = 0; i < subs_.size(); i++)
01442             {
01443               if(subs_[i].alive()>0)
01444                 {
01445                   nblive_++;
01446                   try{
01447                     subs_[i].post(master_message_prg_,true);
01448                     
01449                     unsigned long retval = subs_[i].rcvNonBlocking(master_message_prr_,true);
01450                     if(retval == (unsigned long) master_message_prr_->mtype){
01451                       prg* p = (struct prg*)(master_message_prr_->mtext);
01452                       subs_[i].setParams(p);
01453                       spMStates_[i] = p->Ms;
01454                       spmStates_[i] = p->ms;
01455                       cpustat_->addEntry(p->ms);
01456                       if(!subs_[i].inInconsistentState() && 
01457                          (p->Ms == edm::event_processor::sError 
01458                           || p->Ms == edm::event_processor::sInvalid
01459                           || p->Ms == edm::event_processor::sStopping))
01460                         {
01461                           std::ostringstream ost;
01462                           ost << "edm::eventprocessor slot " << i << " process id " 
01463                               << subs_[i].pid() << " not in Running state : Mstate=" 
01464                               << evtProcessor_.stateNameFromIndex(p->Ms) << " mstate="
01465                               << evtProcessor_.moduleNameFromIndex(p->ms) 
01466                               << " - Look into possible error messages from HLT process";
01467                           LOG4CPLUS_WARN(getApplicationLogger(),ost.str());
01468                         }
01469                       nbp->value_ += subs_[i].params().nbp;
01470                       nba->value_  += subs_[i].params().nba;
01471                       if(dqm)dqm->value_ += p->dqm;
01472                       nbTotalDQM_ +=  p->dqm;
01473                       scalersUpdates_ += p->trp;
01474                       if(p->ls > ls->value_) ls->value_ = p->ls;
01475                       if(p->ps != ps->value_) ps->value_ = p->ps;
01476                     }
01477                     else{
01478                       nbp->value_ += subs_[i].get_save_nbp();
01479                       nba->value_ += subs_[i].get_save_nba();
01480                     }
01481                   } 
01482                   catch(evf::Exception &e){
01483                     LOG4CPLUS_INFO(getApplicationLogger(),
01484                                    "could not send/receive msg on slot " 
01485                                    << i << " - " << e.what());    
01486                   }
01487                     
01488                 }
01489               else
01490                 {
01491                   nbp->value_ += subs_[i].get_save_nbp();
01492                   nba->value_ += subs_[i].get_save_nba();
01493                   nbdead_++;
01494                 }
01495             }
01496           if(nbp->value_>64){//have some slaves already processed more than one event ? (eventually make this == number of raw cells)
01497             for(unsigned int i = 0; i < subs_.size(); i++)
01498               {
01499                 if(subs_[i].params().nbp == 0){ // a slave has processed 0 events 
01500                   // check that the process is not stuck
01501                   if(subs_[i].alive()>0 && subs_[i].params().ms == 0) // the process is seen alive but in us=Invalid(0)
01502                     {
01503                       subs_[i].found_invalid();//increase the "found_invalid" counter
01504                       if(subs_[i].nfound_invalid() > 60){ //wait x monitor cycles (~1 min a good time ?) before doing something about a stuck slave
01505                         MsgBuf msg3(NUMERIC_MESSAGE_SIZE,MSQM_MESSAGE_TYPE_FSTOP);      // send a force-stop signal             
01506                         subs_[i].post(msg3,false);
01507                         std::ostringstream ost1;
01508                         ost1 << "-W- Process in slot " << i << " Never reached the running state - forcestopping it"; 
01509                         localLog(ost1.str());
01510                         LOG4CPLUS_ERROR(getApplicationLogger(),ost1.str());    
01511                         XCEPT_DECLARE(evf::Exception,
01512                                       sentinelException, ost1.str());
01513                         notifyQualified("error",sentinelException);
01514 
01515                       }
01516                     }
01517                 }
01518               }
01519           }
01520         }
01521     }
01522     catch(std::exception &e){
01523       LOG4CPLUS_INFO(getApplicationLogger(),"std exception - " << e.what());    
01524     }
01525     catch(...){
01526       LOG4CPLUS_INFO(getApplicationLogger(),"unknown exception ");    
01527     }
01528   }
01529   else{
01530     for(unsigned int i = 0; i < subs_.size(); i++)
01531       {
01532         if(subs_[i].alive()==-1000)
01533           {
01534             spMStates_[i] = edm::event_processor::sInit;
01535             spmStates_[i] = 0;
01536           }
01537       }
01538   }
01539   try{
01540     monitorInfoSpace_->lock();
01541     monitorInfoSpace_->fireItemGroupChanged(names_,0);
01542     monitorInfoSpace_->unlock();
01543   }
01544   catch(xdata::exception::Exception &e)
01545     {
01546       LOG4CPLUS_ERROR(log_, "Exception from fireItemGroupChanged: " << e.what());
01547       //        localLog(e.what());
01548     }
01549   ::sleep(superSleepSec_.value_);       
01550   return true;
01551 }
01552 
01553 void FUEventProcessor::startScalersWorkLoop() throw (evf::Exception)
01554 {
01555   try {
01556     wlScalers_=
01557       toolbox::task::getWorkLoopFactory()->getWorkLoop("Scalers",
01558                                                        "waiting");
01559     if (!wlScalers_->isActive()) wlScalers_->activate();
01560     asScalers_ = toolbox::task::bind(this,&FUEventProcessor::scalers,
01561                                      "Scalers");
01562     
01563   wlScalers_->submit(asScalers_);
01564   wlScalersActive_ = true;
01565   }
01566   catch (xcept::Exception& e) {
01567     std::string msg = "Failed to start workloop 'Scalers'.";
01568     XCEPT_RETHROW(evf::Exception,msg,e);
01569   }
01570 }
01571 
01572 //______________________________________________________________________________
01573 
01574 void FUEventProcessor::startSummarizeWorkLoop() throw (evf::Exception)
01575 {
01576   try {
01577     wlSummarize_=
01578       toolbox::task::getWorkLoopFactory()->getWorkLoop("Summary",
01579                                                        "waiting");
01580     if (!wlSummarize_->isActive()) wlSummarize_->activate();
01581     
01582     asSummarize_ = toolbox::task::bind(this,&FUEventProcessor::summarize,
01583                                        "Summary");
01584 
01585     wlSummarize_->submit(asSummarize_);
01586     wlSummarizeActive_ = true;
01587   }
01588   catch (xcept::Exception& e) {
01589     std::string msg = "Failed to start workloop 'Summarize'.";
01590     XCEPT_RETHROW(evf::Exception,msg,e);
01591   }
01592 }
01593 
01594 //______________________________________________________________________________
01595 
01596 bool FUEventProcessor::scalers(toolbox::task::WorkLoop* wl)
01597 {
01598   if(evtProcessor_)
01599     {
01600       if(!evtProcessor_.getTriggerReport(true)) {
01601         wlScalersActive_ = false;
01602         return false;
01603       }
01604     }
01605   else
01606     {
01607       std::cout << getpid()<< " Scalers workloop, bailing out, no evtProcessor " << std::endl;
01608       wlScalersActive_ = false;
01609       return false;
01610     }
01611   if(myProcess_) 
01612     {
01613       //      std::cout << getpid() << "going to post on control queue from scalers" << std::endl;
01614       int ret = myProcess_->postSlave(evtProcessor_.getPackedTriggerReport(),false);
01615       if(ret!=0)      std::cout << "scalers workloop, error posting to sqs_ " << errno << std::endl;
01616       scalersUpdates_++;
01617     }
01618   else
01619     evtProcessor_.fireScalersUpdate();
01620   return true;
01621 }
01622 
01623 //______________________________________________________________________________
01624 bool FUEventProcessor::summarize(toolbox::task::WorkLoop* wl)
01625 {
01626   evtProcessor_.resetPackedTriggerReport();
01627   bool atLeastOneProcessUpdatedSuccessfully = false;
01628   int msgCount = 0;
01629   for (unsigned int i = 0; i < subs_.size(); i++)
01630     {
01631       if(subs_[i].alive()>0)
01632         {
01633           int ret = 0;
01634           if(subs_[i].check_postponed_trigger_update(master_message_trr_,
01635                                                      evtProcessor_.getLumiSectionReferenceIndex()))
01636             {
01637               ret = MSQS_MESSAGE_TYPE_TRR;
01638               std::cout << "using postponed report from slot " << i << " for ls " << evtProcessor_.getLumiSectionReferenceIndex() << std::endl;
01639             }
01640           else{
01641             bool insync = false;
01642             bool exception_caught = false;
01643             while(!insync){
01644               try{
01645                 ret = subs_[i].rcv(master_message_trr_,false);
01646               }
01647               catch(evf::Exception &e)
01648                 {
01649                   std::cout << "exception in msgrcv on " << i 
01650                             << " " << subs_[i].alive() << " " << strerror(errno) << std::endl;
01651                   exception_caught = true;
01652                   break;
01653                   //do nothing special
01654                 }
01655               if(ret==MSQS_MESSAGE_TYPE_TRR) {
01656                 TriggerReportStatic *trp = (TriggerReportStatic *)master_message_trr_->mtext;
01657                 if(trp->lumiSection >= evtProcessor_.getLumiSectionReferenceIndex()){
01658                   insync = true;
01659                 }
01660               }
01661             }
01662             if(exception_caught) continue;
01663           }
01664           msgCount++;
01665           if(ret==MSQS_MESSAGE_TYPE_TRR) {
01666             TriggerReportStatic *trp = (TriggerReportStatic *)master_message_trr_->mtext;
01667             if(trp->lumiSection > evtProcessor_.getLumiSectionReferenceIndex()){
01668               std::cout << "postpone handling of msg from slot " << i << " with Ls " <<  trp->lumiSection
01669                         << " should be " << evtProcessor_.getLumiSectionReferenceIndex() << std::endl;
01670               subs_[i].add_postponed_trigger_update(master_message_trr_);
01671             }else{
01672               atLeastOneProcessUpdatedSuccessfully = true;
01673               evtProcessor_.sumAndPackTriggerReport(master_message_trr_);
01674             }
01675           }
01676           else std::cout << "msgrcv returned error " << errno << std::endl;
01677         }
01678     }
01679   if(atLeastOneProcessUpdatedSuccessfully){
01680     nbSubProcessesReporting_.value_ = msgCount;
01681     evtProcessor_.getPackedTriggerReportAsStruct()->nbExpected = nbSubProcesses_.value_;
01682     evtProcessor_.getPackedTriggerReportAsStruct()->nbReporting = nbSubProcessesReporting_.value_;
01683     evtProcessor_.updateRollingReport();
01684     evtProcessor_.fireScalersUpdate();
01685   }
01686   else{
01687     LOG4CPLUS_WARN(getApplicationLogger(),"Summarize loop: no process updated successfully - sleep 10 seconds before trying again");          
01688     if(msgCount==0) evtProcessor_.withdrawLumiSectionIncrement();
01689     nbSubProcessesReporting_.value_ = 0;
01690     ::sleep(10);
01691   }
01692   if(fsm_.stateName()->toString()!="Enabled"){
01693     wlScalersActive_ = false;
01694     return false;
01695   }
01696   //  cpustat_->printStat();
01697   if(iDieStatisticsGathering_.value_){
01698     try{
01699       unsigned long long idleTmp=idleProcStats_;
01700       unsigned long long allPSTmp=allProcStats_;
01701       idleProcStats_=allProcStats_=0;
01702 
01703       utils::procCpuStat(idleProcStats_,allProcStats_);
01704       timeval oldtime=lastProcReport_;
01705       gettimeofday(&lastProcReport_,0);
01706 
01707       if (allPSTmp!=0 && idleTmp!=0 && allProcStats_!=allPSTmp) {
01708         cpustat_->setCPUStat(1000 - ((idleProcStats_-idleTmp)*1000)/(allProcStats_-allPSTmp));
01709         int deltaTms=1000 * (lastProcReport_.tv_sec-oldtime.tv_sec)
01710                     + (lastProcReport_.tv_usec-oldtime.tv_usec)/1000;
01711         cpustat_->setElapsed(deltaTms);
01712       }
01713       else {
01714         cpustat_->setCPUStat(0);
01715         cpustat_->setElapsed(0);
01716       }
01717 
01718       TriggerReportStatic *trsp = evtProcessor_.getPackedTriggerReportAsStruct();
01719       cpustat_ ->setNproc(trsp->eventSummary.totalEvents);
01720       cpustat_ ->sendStat(evtProcessor_.getLumiSectionReferenceIndex());
01721       ratestat_->sendStat((unsigned char*)trsp,
01722                           sizeof(TriggerReportStatic),
01723                           evtProcessor_.getLumiSectionReferenceIndex());
01724     }catch(evf::Exception &e){
01725       LOG4CPLUS_INFO(getApplicationLogger(),"coud not send statistics"
01726                      << e.what());
01727     }
01728   }
01729   cpustat_->reset();
01730   return true;
01731 }
01732 
01733 
01734 
01735 bool FUEventProcessor::receivingAndMonitor(toolbox::task::WorkLoop *)
01736 {
01737   try{
01738     myProcess_->rcvSlave(slave_message_monitoring_,true); //will receive only messages from Master
01739     switch(slave_message_monitoring_->mtype)
01740       {
01741       case MSQM_MESSAGE_TYPE_MCS:
01742         {
01743           xgi::Input *in = 0;
01744           xgi::Output out;
01745           evtProcessor_.microState(in,&out);
01746           MsgBuf msg1(out.str().size(),MSQS_MESSAGE_TYPE_MCR);
01747           strncpy(msg1->mtext,out.str().c_str(),out.str().size());
01748           myProcess_->postSlave(msg1,true);
01749           break;
01750         }
01751       
01752       case MSQM_MESSAGE_TYPE_PRG:
01753         {
01754           xdata::Serializable *dqmp = 0;
01755           xdata::UnsignedInteger32 *dqm = 0;
01756           evtProcessor_.monitoring(0);
01757           try{
01758             dqmp = applicationInfoSpace_-> find("nbDqmUpdates");
01759           }  catch(xdata::exception::Exception e){}
01760           if(dqmp!=0)
01761             dqm = (xdata::UnsignedInteger32*)dqmp;
01762 
01763           //      monitorInfoSpace_->lock();  
01764           prg * data           = (prg*)slave_message_prr_->mtext;
01765           data->ls             = evtProcessor_.lsid_;
01766           data->eols           = evtProcessor_.lastLumiUsingEol_;
01767           data->ps             = evtProcessor_.psid_;
01768           data->nbp            = evtProcessor_->totalEvents();
01769           data->nba            = evtProcessor_->totalEventsPassed();
01770           data->Ms             = evtProcessor_.epMAltState_.value_;
01771           data->ms             = evtProcessor_.epmAltState_.value_;
01772           if(dqm) data->dqm    = dqm->value_; else data->dqm = 0;
01773           data->trp            = scalersUpdates_;
01774           //      monitorInfoSpace_->unlock();  
01775           myProcess_->postSlave(slave_message_prr_,true);
01776           if(exitOnError_.value_)
01777           { 
01778             // after each monitoring cycle check if we are in inconsistent state and exit if configured to do so  
01779             //      std::cout << getpid() << "receivingAndMonitor: trying to acquire stop lock " << std::endl;
01780             if(data->Ms == edm::event_processor::sError) 
01781               { 
01782                 bool running = true;
01783                 int count = 0;
01784                 while(running){
01785                   int retval = pthread_mutex_lock(&stop_lock_);
01786                   if(retval != 0) perror("error");
01787                   running = fsm_.stateName()->toString()=="Enabled";
01788                   if(count>5) _exit(-1);
01789                   pthread_mutex_unlock(&stop_lock_);
01790                   if(running) {::sleep(1); count++;}
01791                 }
01792               }
01793           }
01794           break;
01795         }
01796       case MSQM_MESSAGE_TYPE_WEB:
01797         {
01798           xgi::Input  *in = 0;
01799           xgi::Output out;
01800           unsigned int bytesToSend = 0;
01801           MsgBuf msg1(NUMERIC_MESSAGE_SIZE,MSQS_MESSAGE_TYPE_WEB);
01802           std::string query = slave_message_monitoring_->mtext;
01803           size_t pos = query.find_first_of("&");
01804           std::string method;
01805           std::string args;
01806           if(pos!=std::string::npos)  
01807             {
01808               method = query.substr(0,pos);
01809               args = query.substr(pos+1,query.length()-pos-1);
01810             }
01811           else
01812             method=query;
01813 
01814           if(method=="Spotlight")
01815             {
01816               spotlightWebPage(in,&out);
01817             }
01818           else if(method=="procStat")
01819             {
01820               procStat(in,&out);
01821             }
01822           else if(method=="moduleWeb")
01823             {
01824               internal::MyCgi mycgi;
01825               boost::char_separator<char> sep(";");
01826               boost::tokenizer<boost::char_separator<char> > tokens(args, sep);
01827               for (boost::tokenizer<boost::char_separator<char> >::iterator tok_iter = tokens.begin();
01828                    tok_iter != tokens.end(); ++tok_iter){
01829                 size_t pos = (*tok_iter).find_first_of("%");
01830                 if(pos != std::string::npos){
01831                   std::string first  = (*tok_iter).substr(0    ,                        pos);
01832                   std::string second = (*tok_iter).substr(pos+1, (*tok_iter).length()-pos-1);
01833                   mycgi.getEnvironment()[first]=second;
01834                 }
01835               }
01836               moduleWeb(&mycgi,&out);
01837             }
01838           else if(method=="Default")
01839             {
01840               defaultWebPage(in,&out);
01841             }
01842           else 
01843             {
01844               out << "Error 404!!!!!!!!" << std::endl;
01845             }
01846 
01847 
01848           bytesToSend = out.str().size();
01849           unsigned int cycle = 0;
01850           if(bytesToSend==0)
01851             {
01852               snprintf(msg1->mtext, NUMERIC_MESSAGE_SIZE, "%d", bytesToSend);
01853               myProcess_->postSlave(msg1,true);
01854             }
01855           while(bytesToSend !=0){
01856             unsigned int msgSize = bytesToSend>MAX_PIPE_BUFFER_SIZE ? MAX_PIPE_BUFFER_SIZE : bytesToSend;
01857             write(anonymousPipe_[PIPE_WRITE],
01858                   out.str().c_str()+MAX_PIPE_BUFFER_SIZE*cycle,
01859                   msgSize);
01860             snprintf(msg1->mtext, NUMERIC_MESSAGE_SIZE, "%d", msgSize);
01861             myProcess_->postSlave(msg1,true);
01862             bytesToSend -= msgSize;
01863             cycle++;
01864           }
01865           break;
01866         }
01867       case MSQM_MESSAGE_TYPE_TRP:
01868         {
01869           break;
01870         }
01871       }
01872   }
01873   catch(evf::Exception &e){std::cout << "exception caught in recevingM: " << e.what() << std::endl;}
01874   return true;
01875 }
01876 
01877 void FUEventProcessor::startSignalMonitorWorkLoop() throw (evf::Exception)
01878 {
01879   //todo rewind/check semaphore
01880   //start workloop
01881   try {
01882     wlSignalMonitor_=
01883       toolbox::task::getWorkLoopFactory()->getWorkLoop("SignalMonitor",
01884                                                        "waiting");
01885 
01886     if (!wlSignalMonitor_->isActive()) wlSignalMonitor_->activate();
01887     asSignalMonitor_ = toolbox::task::bind(this,&FUEventProcessor::sigmon,
01888                                        "SignalMonitor");
01889     wlSignalMonitor_->submit(asSignalMonitor_);
01890     signalMonitorActive_ = true;
01891   }
01892   catch (xcept::Exception& e) {
01893     std::string msg = "Failed to start workloop 'SignalMonitor'. (3)";
01894     std::cout << e.what() << std::endl;
01895     XCEPT_RETHROW(evf::Exception,msg,e);
01896   }
01897 }
01898  
01899 
01900 bool FUEventProcessor::sigmon(toolbox::task::WorkLoop* wl)
01901 {
01902   while (1) {
01903     sem_wait(sigmon_sem_);
01904     std::cout << " received signal notification from slave!"<< std::endl;
01905   
01906     //check if shutdown time
01907     bool running = fsm_.stateName()->toString()=="Enabled";
01908     bool stopping = fsm_.stateName()->toString()=="stopping";
01909     bool enabling = fsm_.stateName()->toString()=="enabling";
01910     if (!running && !enabling) {
01911       signalMonitorActive_ = false;
01912       return false;
01913     }
01914 
01915     crashesThisRun_++;
01916     gettimeofday(&lastCrashTime_,0);
01917 
01918     //set core size limit to 0 in master and slaves
01919     if (crashesThisRun_>=crashesToDump_.value_ && (running || stopping) && !rlimit_coresize_changed_) {
01920 
01921       rlimit rlold;
01922       getrlimit(RLIMIT_CORE,&rlold);
01923       rlimit rlnew = rlold;
01924       rlnew.rlim_cur=0;
01925       setrlimit(RLIMIT_CORE,&rlnew);
01926       rlimit_coresize_changed_=true;
01927       MsgBuf master_message_rli_(NUMERIC_MESSAGE_SIZE,MSQM_MESSAGE_TYPE_RLI);
01928       //in case of frequent crashes, allow first slot to dump (until restart)
01929       unsigned int min=1;
01930       for (unsigned int i = min; i < subs_.size(); i++) {
01931         try {
01932           if (subs_[i].alive()) {
01933             subs_[i].post(master_message_rli_,false);
01934           }
01935         }
01936         catch (...) {}
01937       }
01938       std::ostringstream ostr;
01939       ostr << "Number of recent slave crashes reaches " << crashesThisRun_
01940            << ". Disabling core dumps for next 15 minutes in this FilterUnit";
01941       LOG4CPLUS_WARN(getApplicationLogger(),ostr.str());
01942     }
01943   }//end while loop
01944   signalMonitorActive_ = false;
01945   return false;
01946 }
01947 
01948 
01949 bool FUEventProcessor::enableCommon()
01950 {
01951   try {    
01952     if(hasShMem_) attachDqmToShm();
01953     int sc = 0;
01954     evtProcessor_->clearCounters();
01955     if(isRunNumberSetter_)
01956       evtProcessor_->setRunNumber(runNumber_.value_);
01957     else
01958       evtProcessor_->declareRunNumber(runNumber_.value_);
01959 
01960     try{
01961       ::sleep(1);
01962       evtProcessor_->runAsync();
01963       sc = evtProcessor_->statusAsync();
01964     }
01965     catch(cms::Exception &e) {
01966       reasonForFailedState_ = e.explainSelf();
01967       fsm_.fireFailed(reasonForFailedState_,this);
01968       localLog(reasonForFailedState_);
01969       return false;
01970     }    
01971     catch(std::exception &e) {
01972       reasonForFailedState_  = e.what();
01973       fsm_.fireFailed(reasonForFailedState_,this);
01974       localLog(reasonForFailedState_);
01975       return false;
01976     }
01977     catch(...) {
01978       reasonForFailedState_ = "Unknown Exception";
01979       fsm_.fireFailed(reasonForFailedState_,this);
01980       localLog(reasonForFailedState_);
01981       return false;
01982     }
01983     if(sc != 0) {
01984       std::ostringstream oss;
01985       oss<<"EventProcessor::runAsync returned status code " << sc;
01986       reasonForFailedState_ = oss.str();
01987       fsm_.fireFailed(reasonForFailedState_,this);
01988       localLog(reasonForFailedState_);
01989       return false;
01990     }
01991   }
01992   catch (xcept::Exception &e) {
01993     reasonForFailedState_ = "enabling FAILED: " + (std::string)e.what();
01994     fsm_.fireFailed(reasonForFailedState_,this);
01995     localLog(reasonForFailedState_);
01996     return false;
01997   }
01998   try{
01999     fsm_.fireEvent("EnableDone",this);
02000   }
02001   catch (xcept::Exception &e) {
02002     std::cout << "exception " << (std::string)e.what() << std::endl;
02003     throw;
02004   }
02005 
02006   return false;
02007 }
02008 
02009 void FUEventProcessor::forkProcessFromEDM_helper(void * addr) {
02010   ((FUEventProcessor*)addr)->forkProcessesFromEDM();
02011 }
02012 
02013 void FUEventProcessor::forkProcessesFromEDM() {
02014 
02015   moduleweb::ForkParams * forkParams = &(forkInfoObj_->forkParams);
02016   unsigned int forkFrom=0;
02017   unsigned int forkTo=nbSubProcesses_.value_;
02018   if (forkParams->slotId>=0) {
02019     forkFrom=forkParams->slotId;
02020     forkTo=forkParams->slotId+1;
02021   }
02022 
02023   //before fork, make sure to disconnect output modules from Shm
02024   try {
02025     if (sorRef_) {
02026       std::vector<edm::FUShmOutputModule *> shmOutputs = sorRef_->getShmOutputModules();
02027       for (unsigned int i=0;i<shmOutputs.size();i++) {
02028         //unregister PID from ShmBuffer/RB
02029         shmOutputs[i]->unregisterFromShm();
02030         //disconnect from Shm
02031         shmOutputs[i]->stop();
02032       }
02033     }
02034   }
02035   catch (std::exception &e)
02036   {
02037     reasonForFailedState_ =  (std::string)"Thrown exception while disconnecting ShmOutputModule from Shm: " + e.what();
02038     LOG4CPLUS_ERROR(getApplicationLogger(),reasonForFailedState_);
02039     fsm_.fireFailed(reasonForFailedState_,this);
02040     localLog(reasonForFailedState_);
02041   }
02042   catch (...) {
02043     reasonForFailedState_ =  "Thrown unknown exception while disconnecting ShmOutputModule from Shm: ";
02044     LOG4CPLUS_ERROR(getApplicationLogger(),reasonForFailedState_);
02045     fsm_.fireFailed(reasonForFailedState_,this);
02046     localLog(reasonForFailedState_);
02047   }
02048 
02049   std::string currentState = fsm_.stateName()->toString();
02050 
02051   //destroy MessageServicePresence thread before fork
02052   if (currentState!="stopping") {
02053     try {
02054       messageServicePresence_.reset();
02055     }
02056     catch (...) {
02057       LOG4CPLUS_ERROR(getApplicationLogger(),"Unable to destroy MessageService thread before fork!");
02058     }
02059   }
02060 
02061   if (currentState=="stopping") {
02062     LOG4CPLUS_ERROR(getApplicationLogger(),"Can not fork subprocesses in state " << fsm_.stateName()->toString());
02063     forkParams->isMaster=1;
02064     forkInfoObj_->forkParams.slotId=-1;
02065     forkInfoObj_->forkParams.restart=0;
02066   }
02067   //fork loop
02068   else for(unsigned int i=forkFrom; i<forkTo; i++)
02069   {
02070     int retval = subs_[i].forkNew();
02071     if(retval==0)
02072     {
02073       forkParams->isMaster=0;
02074       myProcess_ = &subs_[i];
02075       // dirty hack: delete/recreate global binary semaphore for later use in child
02076       delete toolbox::mem::_s_mutex_ptr_;
02077       toolbox::mem::_s_mutex_ptr_ = new toolbox::BSem(toolbox::BSem::FULL,true);
02078       int retval = pthread_mutex_destroy(&stop_lock_);
02079       if(retval != 0) perror("error");
02080       retval = pthread_mutex_init(&stop_lock_,0);
02081       if(retval != 0) perror("error");
02082       fsm_.disableRcmsStateNotification();
02083 
02084       //recreate MessageLogger thread in slave after fork
02085       try{
02086         edm::PresenceFactory *pf = edm::PresenceFactory::get();
02087         if(pf != 0) {
02088           messageServicePresence_ = pf->makePresence("MessageServicePresence");
02089         }
02090         else {
02091           LOG4CPLUS_ERROR(getApplicationLogger(),
02092               "SLAVE: Unable to create message service presence. pid:"<<getpid());
02093         }
02094       }
02095       catch(...) {
02096         LOG4CPLUS_ERROR(getApplicationLogger(),"SLAVE: Unknown Exception in MessageServicePresence. pid:"<<getpid());
02097       }
02098 
02099       ML::MLlog4cplus::setAppl(this);
02100 
02101       //reconnect to Shm from output modules
02102       try {
02103         if (sorRef_) {
02104           std::vector<edm::FUShmOutputModule *> shmOutputs = sorRef_->getShmOutputModules();
02105           for (unsigned int i=0;i<shmOutputs.size();i++)
02106             shmOutputs[i]->start();
02107         }
02108       }
02109       catch (...)
02110       {
02111         LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception (ShmOutputModule sending InitMsg (pid:"<<getpid() <<")");
02112       }
02113 
02114       if (forkParams->restart) {
02115         //do restart things
02116         scalersUpdates_ = 0;
02117         try {
02118           fsm_.fireEvent("Stop",this); // need to set state in fsm first to allow stopDone transition
02119           fsm_.fireEvent("StopDone",this); // need to set state in fsm first to allow stopDone transition
02120           fsm_.fireEvent("Enable",this); // need to set state in fsm first to allow stopDone transition
02121         } catch (...) {
02122           LOG4CPLUS_WARN(getApplicationLogger(),"Failed to Stop/Enable FSM of the restarted slave EP");
02123         }
02124         try{
02125           xdata::Serializable *lsid = applicationInfoSpace_->find("lumiSectionIndex");
02126           if(lsid) {
02127             ((xdata::UnsignedInteger32*)(lsid))->value_--; // need to reset to value before end of ls in which process died
02128           }
02129         }
02130         catch(...){
02131           std::cout << "trouble with lsindex during restart" << std::endl;
02132         }
02133         try{
02134           xdata::Serializable *lstb = applicationInfoSpace_->find("lsToBeRecovered");
02135           if(lstb) {
02136             ((xdata::Boolean*)(lstb))->value_ = false; // do not issue eol/bol for all Ls when restarting
02137           }
02138         }
02139         catch(...){
02140           std::cout << "trouble with resetting flag for eol recovery " << std::endl;
02141         }
02142         evtProcessor_.adjustLsIndexForRestart();
02143         evtProcessor_.resetPackedTriggerReport();
02144       }
02145 
02146       //start other threads
02147       startReceivingLoop();
02148       startReceivingMonitorLoop();
02149       startScalersWorkLoop();
02150       while(!evtProcessor_.isWaitingForLs())
02151         ::usleep(100000);//wait for scalers loop to start
02152 
02153       //connect DQMShmOutputModule
02154       if(hasShMem_) attachDqmToShm();
02155 
02156       //catch transition error if we are already Enabled
02157       try {
02158         fsm_.fireEvent("EnableDone",this);
02159       }
02160       catch (...) {}
02161 
02162       //make sure workloops are started
02163       while (!wlReceiving_->isActive() || !wlReceivingMonitor_->isActive()) usleep(10000);
02164 
02165       //unmask signals
02166       sigset_t tmpset_thread;
02167       sigemptyset(&tmpset_thread);
02168       sigaddset(&tmpset_thread, SIGQUIT);
02169       sigaddset(&tmpset_thread, SIGILL);
02170       sigaddset(&tmpset_thread, SIGABRT);
02171       sigaddset(&tmpset_thread, SIGFPE);
02172       sigaddset(&tmpset_thread, SIGSEGV);
02173       sigaddset(&tmpset_thread, SIGALRM);
02174       //sigprocmask(SIG_UNBLOCK, &tmpset_thread, 0);
02175       pthread_sigmask(SIG_UNBLOCK,&tmpset_thread,0);
02176      
02177       //set signal handlers 
02178       struct sigaction sa;
02179       sigset_t tmpset;
02180       memset(&tmpset,0,sizeof(tmpset));
02181       sigemptyset(&tmpset);
02182       sa.sa_mask=tmpset;
02183       sa.sa_flags=SA_RESETHAND | SA_SIGINFO;
02184       sa.sa_handler=0;
02185       sa.sa_sigaction=evfep_sighandler;
02186 
02187       sigaction(SIGQUIT,&sa,0);
02188       sigaction(SIGILL,&sa,0);
02189       sigaction(SIGABRT,&sa,0);
02190       sigaction(SIGFPE,&sa,0);
02191       sigaction(SIGSEGV,&sa,0);
02192       sa.sa_sigaction=evfep_alarmhandler;
02193       sigaction(SIGALRM,&sa,0);
02194 
02195       //child return to DaqSource
02196       return ;
02197     }
02198     else {
02199 
02200       forkParams->isMaster=1;
02201       forkInfoObj_->forkParams.slotId=-1;
02202       if (forkParams->restart) {
02203         std::ostringstream ost1;
02204         ost1 << "-I- New Process " << retval << " forked for slot " << forkParams->slotId;
02205         localLog(ost1.str());
02206       }
02207       forkInfoObj_->forkParams.restart=0;
02208       //start "crash" receiver workloop
02209     }
02210   }
02211 
02212   //recreate MessageLogger thread after fork
02213   try{
02214     //release the presense factory in master
02215     edm::PresenceFactory *pf = edm::PresenceFactory::get();
02216     if(pf != 0) {
02217       messageServicePresence_ = pf->makePresence("MessageServicePresence");
02218     }
02219     else {
02220       LOG4CPLUS_ERROR(getApplicationLogger(),
02221           "Unable to recreate message service presence ");
02222     }
02223   }
02224   catch(...) {
02225     LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception in MessageServicePresence");
02226   }
02227   restart_in_progress_=false;
02228   edm_init_done_=true;
02229 }
02230 
02231 bool FUEventProcessor::enableForkInEDM() 
02232 {
02233   evtProcessor_.resetWaiting();
02234   try {
02235     //set to connect to Shm later
02236     //if(hasShMem_) setAttachDqmToShm();
02237 
02238     int sc = 0;
02239     //maybe not needed in MP mode
02240     evtProcessor_->clearCounters();
02241     if(isRunNumberSetter_)
02242       evtProcessor_->setRunNumber(runNumber_.value_);
02243     else
02244       evtProcessor_->declareRunNumber(runNumber_.value_);
02245 
02246     //prepare object used to communicate with DaqSource
02247     pthread_mutex_destroy(&forkObjLock_);
02248     pthread_mutex_init(&forkObjLock_,0);
02249     if (forkInfoObj_) delete forkInfoObj_;
02250     forkInfoObj_ = new moduleweb::ForkInfoObj();
02251     forkInfoObj_->mst_lock_=&forkObjLock_;
02252     forkInfoObj_->fuAddr=(void*)this;
02253     forkInfoObj_->forkHandler = forkProcessFromEDM_helper;
02254     forkInfoObj_->forkParams.slotId=-1;
02255     forkInfoObj_->forkParams.restart=0;
02256     forkInfoObj_->forkParams.isMaster=-1;
02257     forkInfoObj_->stopCondition=0;
02258     if (mwrRef_)
02259       mwrRef_->publishForkInfo(std::string("DaqSource"),forkInfoObj_);
02260 
02261     evtProcessor_->runAsync();
02262     sc = evtProcessor_->statusAsync();
02263 
02264     if(sc != 0) {
02265       std::ostringstream oss;
02266       oss<<"EventProcessor::runAsync returned status code " << sc;
02267       reasonForFailedState_ = oss.str();
02268       fsm_.fireFailed(reasonForFailedState_,this);
02269       LOG4CPLUS_FATAL(log_,reasonForFailedState_);
02270       return false;
02271     }
02272   }
02273   //catch exceptions on master side
02274   catch(cms::Exception &e) {
02275     reasonForFailedState_ = e.explainSelf();
02276     fsm_.fireFailed(reasonForFailedState_,this);
02277     LOG4CPLUS_FATAL(log_,reasonForFailedState_);
02278     return false;
02279   }    
02280   catch(std::exception &e) {
02281     reasonForFailedState_  = e.what();
02282     fsm_.fireFailed(reasonForFailedState_,this);
02283     LOG4CPLUS_FATAL(log_,reasonForFailedState_);
02284     return false;
02285   }
02286   catch(...) {
02287     reasonForFailedState_ = "Unknown Exception";
02288     fsm_.fireFailed(reasonForFailedState_,this);
02289     LOG4CPLUS_FATAL(log_,reasonForFailedState_);
02290     return false;
02291   }
02292   return true;
02293 }
02294 
02295 bool FUEventProcessor::restartForkInEDM(unsigned int slotId) {
02296   //daqsource will keep this lock until master returns after fork
02297   //so that we don't do another EP restart in between
02298   forkInfoObj_->lock();
02299   forkInfoObj_->forkParams.slotId=slotId;
02300   forkInfoObj_->forkParams.restart=true;
02301   forkInfoObj_->forkParams.isMaster=1;
02302   forkInfoObj_->stopCondition=0;
02303   LOG4CPLUS_DEBUG(log_, " restarting subprocess in slot "<< slotId <<": posting on control semaphore");
02304   sem_post(forkInfoObj_->control_sem_);
02305   forkInfoObj_->unlock();
02306   usleep(1000);
02307   //sleep until fork is performed
02308   int count=50;
02309   restart_in_progress_=true;
02310   while (restart_in_progress_ && count--) usleep(20000);
02311   return true;
02312 }
02313 
02314 bool FUEventProcessor::enableClassic()
02315 {
02316   bool retval = enableCommon();
02317   while(evtProcessor_->getState()!= edm::event_processor::sRunning){
02318     LOG4CPLUS_INFO(getApplicationLogger(),"waiting for edm::EventProcessor to start before enabling watchdog");
02319     ::sleep(1);
02320   }
02321   
02322   //  implementation moved to EPWrapper
02323   //  startScalersWorkLoop(); // this is now not done any longer 
02324   localLog("-I- Start completed");
02325   return retval;
02326 }
02327 bool FUEventProcessor::enableMPEPSlave()
02328 {
02329   //all this happens only in the child process
02330 
02331   startReceivingLoop();
02332   startReceivingMonitorLoop();
02333   evtProcessor_.resetWaiting();
02334   startScalersWorkLoop();
02335   while(!evtProcessor_.isWaitingForLs())
02336     ::sleep(1);
02337 
02338   // @EM test do not run monitor loop in slave, only receiving&Monitor
02339   //  evtProcessor_.startMonitoringWorkLoop();
02340   try{
02341     //    evtProcessor_.makeServicesOnly();
02342     try{
02343       edm::PresenceFactory *pf = edm::PresenceFactory::get();
02344       if(pf != 0) {
02345         pf->makePresence("MessageServicePresence").release();
02346       }
02347       else {
02348         LOG4CPLUS_ERROR(getApplicationLogger(),
02349                         "Unable to create message service presence ");
02350       }
02351     } 
02352     catch(...) {
02353       LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception");
02354     }
02355   ML::MLlog4cplus::setAppl(this);
02356   }       
02357   catch (xcept::Exception &e) {
02358     reasonForFailedState_ = "enabling FAILED: " + (std::string)e.what();
02359     fsm_.fireFailed(reasonForFailedState_,this);
02360     localLog(reasonForFailedState_);
02361   }
02362   bool retval =  enableCommon();
02363   //  while(evtProcessor_->getState()!= edm::event_processor::sRunning){
02364   //    LOG4CPLUS_INFO(getApplicationLogger(),"waiting for edm::EventProcessor to start before enabling watchdog");
02365   //    ::sleep(1);
02366   //  }
02367   return retval;
02368 }
02369 
02370 bool FUEventProcessor::stopClassic()
02371 {
02372   bool failed=false;
02373   try {
02374     LOG4CPLUS_INFO(getApplicationLogger(),"Start stopping :) ...");
02375     edm::EventProcessor::StatusCode rc = evtProcessor_.stop();
02376     if(rc == edm::EventProcessor::epSuccess) 
02377       fsm_.fireEvent("StopDone",this);
02378     else
02379       {
02380         failed=true;
02381         //      epMState_ = evtProcessor_->currentStateName();
02382         if(rc == edm::EventProcessor::epTimedOut)
02383           reasonForFailedState_ = "EventProcessor stop timed out";
02384         else
02385           reasonForFailedState_ = "EventProcessor did not receive STOP event";
02386       }
02387   }
02388   catch (xcept::Exception &e) {
02389     failed=true;
02390     reasonForFailedState_ = "Stopping FAILED: " + (std::string)e.what();
02391   }
02392   catch (edm::Exception &e) {
02393     failed=true;
02394     reasonForFailedState_ = "Stopping FAILED: " + (std::string)e.what();
02395   }
02396   catch (...) {
02397     failed=true;
02398     reasonForFailedState_= "Stopping FAILED: unknown exception";
02399   }
02400   try {
02401     if (hasShMem_) {
02402       detachDqmFromShm();
02403       if (failed) 
02404         LOG4CPLUS_WARN(getApplicationLogger(), 
02405             "In failed STOP - success detaching DQM from Shm. pid:" << getpid());
02406     }
02407   }
02408   catch (cms::Exception & e) {
02409     failed=true;
02410     reasonForFailedState_= "Stopping FAILED: " + (std::string)e.what();
02411   }
02412   catch (...) {
02413     failed=true;
02414     reasonForFailedState_= "DQM detach failed: Unknown exception";
02415   }
02416 
02417   if (failed) {
02418     LOG4CPLUS_FATAL(getApplicationLogger(),"STOP failed: "
02419         << reasonForFailedState_ << " (pid:" << getpid()<<")");
02420     localLog(reasonForFailedState_);
02421     fsm_.fireFailed(reasonForFailedState_,this);
02422   }
02423 
02424   LOG4CPLUS_INFO(getApplicationLogger(),"Finished stopping!");
02425   localLog("-I- Stop completed");
02426   return false;
02427 }
02428 
02429 void FUEventProcessor::stopSlavesAndAcknowledge()
02430 {
02431   MsgBuf msg(0,MSQM_MESSAGE_TYPE_STOP);
02432   MsgBuf msg1(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_STOP);
02433 
02434   std::vector<bool> processes_to_stop(nbSubProcesses_.value_,false);
02435   for(unsigned int i = 0; i < subs_.size(); i++)
02436     {
02437       pthread_mutex_lock(&stop_lock_);
02438       if(subs_[i].alive()>0){
02439         processes_to_stop[i] = true;
02440         subs_[i].post(msg,false);
02441       }
02442       pthread_mutex_unlock(&stop_lock_);
02443     }
02444   for(unsigned int i = 0; i < subs_.size(); i++)
02445     {
02446       pthread_mutex_lock(&stop_lock_);
02447       if(processes_to_stop[i]){
02448         try{
02449           subs_[i].rcv(msg1,false);
02450         }
02451         catch(evf::Exception &e){
02452           std::ostringstream ost;
02453           ost << "failed to get STOP - errno ->" << errno << " " << e.what(); 
02454           reasonForFailedState_ = ost.str();
02455           LOG4CPLUS_ERROR(getApplicationLogger(),reasonForFailedState_);
02456           //      fsm_.fireFailed(reasonForFailedState_,this);
02457           localLog(reasonForFailedState_);
02458           pthread_mutex_unlock(&stop_lock_);
02459           continue;
02460         }
02461       }
02462       else {
02463         pthread_mutex_unlock(&stop_lock_);
02464         continue;
02465       }
02466       pthread_mutex_unlock(&stop_lock_);
02467       if(msg1->mtype==MSQS_MESSAGE_TYPE_STOP)
02468         while(subs_[i].alive()>0) ::usleep(10000);
02469       subs_[i].disconnect();
02470     }
02471   //  subs_.clear();
02472 
02473 }
02474 
02475 void FUEventProcessor::microState(xgi::Input *in,xgi::Output *out)
02476 {
02477   std::string urn = getApplicationDescriptor()->getURN();
02478   try{
02479     evtProcessor_.stateNameFromIndex(0);
02480     evtProcessor_.moduleNameFromIndex(0);
02481   if(myProcess_) {std::cout << "microstate called for child! bail out" << std::endl; return;}
02482   *out << "<tr><td>" << fsm_.stateName()->toString() 
02483        << "</td><td>"<< (myProcess_ ? "S" : "M") <<"</td><td>" << nblive_ << "</td><td>"
02484        << nbdead_ << "</td><td><a href=\"/" << urn << "/procStat\">" << getpid() <<"</a></td>";
02485   evtProcessor_.microState(in,out);
02486   *out << "<td></td><td>" << nbTotalDQM_ 
02487        << "</td><td>" << evtProcessor_.getScalersUpdates() << "</td></tr>";
02488   if(nbSubProcesses_.value_!=0 && !myProcess_) 
02489     {
02490       pthread_mutex_lock(&start_lock_);
02491       for(unsigned int i = 0; i < subs_.size(); i++)
02492         {
02493           try{
02494             if(subs_[i].alive()>0)
02495               {
02496                 *out << "<tr><td  bgcolor=\"#00FF00\" id=\"a"
02497                      << i << "\">""Alive</td><td>S</td><td>"
02498                      << subs_[i].queueId() << "<td>" 
02499                      << subs_[i].queueStatus()<< "/"
02500                      << subs_[i].queueOccupancy() << "/"
02501                      << subs_[i].queuePidOfLastSend() << "/"
02502                      << subs_[i].queuePidOfLastReceive() 
02503                      << "</td><td><a id=\"p"<< i << "\" href=\"SubWeb?process=" 
02504                      << subs_[i].pid() << "&method=procStat\">" 
02505                      << subs_[i].pid()<<"</a></td>" //<< msg->mtext;
02506                      << "<td>" << evtProcessor_.stateNameFromIndex(subs_[i].params().Ms) << "</td><td>" 
02507                      << evtProcessor_.moduleNameFromIndex(subs_[i].params().ms) << "</td><td>" 
02508                      << subs_[i].params().nba << "/" << subs_[i].params().nbp 
02509                      << " (" << float(subs_[i].params().nba)/float(subs_[i].params().nbp)*100. <<"%)" 
02510                      << "</td><td>" << subs_[i].params().ls  << "/" << subs_[i].params().ls 
02511                      << "</td><td>" << subs_[i].params().ps 
02512                      << "</td><td" 
02513                      << ((subs_[i].params().eols<subs_[i].params().ls) ? " bgcolor=\"#00FF00\"" : " bgcolor=\"#FF0000\"")  << ">" 
02514                      << subs_[i].params().eols  
02515                      << "</td><td>" << subs_[i].params().dqm 
02516                      << "</td><td>" << subs_[i].params().trp << "</td>";
02517               }
02518             else 
02519               {
02520                 pthread_mutex_lock(&pickup_lock_);
02521                 *out << "<tr><td id=\"a"<< i << "\" ";
02522                 if(subs_[i].alive()==-1000)
02523                   *out << " bgcolor=\"#bbaabb\">NotInitialized";
02524                 else
02525                   *out << (subs_[i].alive()==0 ? ">Done" : " bgcolor=\"#FF0000\">Dead");
02526                 *out << "</td><td>S</td><td>"<< subs_[i].queueId() << "<td>" 
02527                      << subs_[i].queueStatus() << "/"
02528                      << subs_[i].queueOccupancy() << "/"
02529                      << subs_[i].queuePidOfLastSend() << "/"
02530                      << subs_[i].queuePidOfLastReceive() 
02531                      << "</td><td id=\"p"<< i << "\">"
02532                      <<subs_[i].pid()<<"</td><td colspan=\"5\">" << subs_[i].reasonForFailed();
02533                 if(subs_[i].alive()!=0 && subs_[i].alive()!=-1000) 
02534                   {
02535                     if(autoRestartSlaves_ && subs_[i].restartCount()<=2) 
02536                       *out << " will restart in " << subs_[i].countdown() << " s";
02537                     else if(autoRestartSlaves_)
02538                       *out << " reached maximum restart count";
02539                     else *out << " autoRestart is disabled ";
02540                   }
02541                 *out << "</td><td" 
02542                      << ((subs_[i].params().eols<subs_[i].params().ls) ? 
02543                          " bgcolor=\"#00FF00\"" : " bgcolor=\"#FF0000\"")  
02544                      << ">" 
02545                      << subs_[i].params().eols  
02546                      << "</td><td>" << subs_[i].params().dqm 
02547                      << "</td><td>" << subs_[i].params().trp << "</td>";
02548                 pthread_mutex_unlock(&pickup_lock_);
02549               }
02550             *out << "</tr>";
02551           }
02552           catch(evf::Exception &e){
02553             *out << "<tr><td id=\"a"<< i << "\" " 
02554                  <<"bgcolor=\"#FFFF00\">NotResp</td><td>S</td><td>"<< subs_[i].queueId() << "<td>" 
02555                  << subs_[i].queueStatus() << "/"
02556                  << subs_[i].queueOccupancy() << "/"
02557                  << subs_[i].queuePidOfLastSend() << "/"
02558                  << subs_[i].queuePidOfLastReceive() 
02559                  << "</td><td id=\"p"<< i << "\">"
02560                  <<subs_[i].pid()<<"</td>";
02561           }
02562         }
02563       pthread_mutex_unlock(&start_lock_); 
02564     }
02565   }
02566       catch(evf::Exception &e)
02567       {
02568         LOG4CPLUS_INFO(getApplicationLogger(),"evf::Exception caught in microstate - " << e.what());    
02569       }
02570     catch(cms::Exception &e)
02571       {
02572         LOG4CPLUS_INFO(getApplicationLogger(),"cms::Exception caught in microstate - " << e.what());    
02573       }
02574     catch(std::exception &e)
02575       {
02576         LOG4CPLUS_INFO(getApplicationLogger(),"std::Exception caught in microstate - " << e.what());    
02577       }
02578     catch(...)
02579       {
02580         LOG4CPLUS_INFO(getApplicationLogger(),"unknown exception caught in microstate - ");    
02581       }
02582 
02583 }
02584 
02585 
02586 void FUEventProcessor::updater(xgi::Input *in,xgi::Output *out)
02587 {
02588   using namespace utils;
02589 
02590   *out << updaterStatic_;
02591   mDiv(out,"loads");
02592   uptime(out);
02593   cDiv(out);
02594   mDiv(out,"st",fsm_.stateName()->toString());
02595   mDiv(out,"ru",runNumber_.toString());
02596   mDiv(out,"nsl",nbSubProcesses_.value_);
02597   mDiv(out,"nsr",nbSubProcessesReporting_.value_);
02598   mDiv(out,"cl");
02599   *out << getApplicationDescriptor()->getClassName() 
02600        << (nbSubProcesses_.value_ > 0 ? "MP " : " ");
02601   cDiv(out);
02602   mDiv(out,"in",getApplicationDescriptor()->getInstance());
02603   if(fsm_.stateName()->toString() != "Halted" && fsm_.stateName()->toString() != "halting"){
02604     mDiv(out,"hlt");
02605     *out << "<a href=\"" << configString_.toString() << "\">HLT Config</a>";
02606     cDiv(out);
02607     *out << std::endl;
02608   }
02609   else
02610     mDiv(out,"hlt","Not yet...");
02611 
02612   mDiv(out,"sq",squidPresent_.toString());
02613   mDiv(out,"vwl",(supervising_ ? "Active" : "Not Initialized"));
02614   mDiv(out,"mwl",evtProcessor_.wlMonitoring());
02615   if(nbProcessed != 0 && nbAccepted != 0)
02616     {
02617       mDiv(out,"tt",((xdata::UnsignedInteger32*)nbProcessed)->value_);
02618       mDiv(out,"ac",((xdata::UnsignedInteger32*)nbAccepted)->value_);
02619     }
02620   else
02621     {
02622       mDiv(out,"tt",0);
02623       mDiv(out,"ac",0);
02624     }
02625   if(!myProcess_)
02626     mDiv(out,"swl",(wlSummarizeActive_ ? "Active" : "Inactive"));
02627   else
02628     mDiv(out,"swl",(wlScalersActive_ ? "Active" : "Inactive"));
02629 
02630   mDiv(out,"idi",iDieUrl_.value_);
02631   if(vp_!=0){
02632     mDiv(out,"vpi",(unsigned int) vp_);
02633     if(vulture_->hasStarted()>=0)
02634       mDiv(out,"vul","Prowling");
02635     else
02636       mDiv(out,"vul","Dead");
02637   }
02638   else{
02639     mDiv(out,"vul",(vulture_==0 ? "Nope" : "Hatching"));
02640   }    
02641   if(evtProcessor_){
02642     mDiv(out,"ll");
02643     *out << evtProcessor_.lastLumi().ls
02644          << "," << evtProcessor_.lastLumi().proc << "," << evtProcessor_.lastLumi().acc;
02645     cDiv(out);
02646   }
02647   mDiv(out,"lg");
02648   for(unsigned int i = logRingIndex_; i<logRingSize_; i++)
02649     *out << logRing_[i] << std::endl;
02650   if(logWrap_)
02651     for(unsigned int i = 0; i<logRingIndex_; i++)
02652       *out << logRing_[i] << std::endl;
02653   cDiv(out);
02654   mDiv(out,"cha");
02655   if(cpustat_) *out << cpustat_->getChart();
02656   cDiv(out);
02657 }
02658 
02659 void FUEventProcessor::procStat(xgi::Input *in, xgi::Output *out)
02660 {
02661   evf::utils::procStat(out);
02662 }
02663 
02664 void FUEventProcessor::sendMessageOverMonitorQueue(MsgBuf &buf)
02665 {
02666   if(myProcess_) myProcess_->postSlave(buf,true);
02667 }
02668 
02669 void FUEventProcessor::makeStaticInfo()
02670 {
02671   using namespace utils;
02672   std::ostringstream ost;
02673   mDiv(&ost,"ve");
02674   ost<< "$Revision: 1.160 $ (" << edm::getReleaseVersion() <<")";
02675   cDiv(&ost);
02676   mDiv(&ost,"ou",outPut_.toString());
02677   mDiv(&ost,"sh",hasShMem_.toString());
02678   mDiv(&ost,"mw",hasModuleWebRegistry_.toString());
02679   mDiv(&ost,"sw",hasServiceWebRegistry_.toString());
02680   
02681   xdata::Serializable *monsleep = 0;
02682   xdata::Serializable *lstimeout = 0;
02683   try{
02684     monsleep  = applicationInfoSpace_->find("monSleepSec");
02685     lstimeout = applicationInfoSpace_->find("lsTimeOut");
02686   }
02687   catch(xdata::exception::Exception e){
02688   }
02689   
02690   if(monsleep!=0)
02691     mDiv(&ost,"ms",monsleep->toString());
02692   if(lstimeout!=0)
02693     mDiv(&ost,"lst",lstimeout->toString());
02694   char cbuf[sizeof(struct utsname)];
02695   struct utsname* buf = (struct utsname*)cbuf;
02696   uname(buf);
02697   mDiv(&ost,"sysinfo");
02698   ost << buf->sysname << " " << buf->nodename 
02699       << " " << buf->release << " " << buf->version << " " << buf->machine;
02700   cDiv(&ost);
02701   updaterStatic_ = ost.str();
02702 }
02703 
02704 void FUEventProcessor::handleSignalSlave(int sig, siginfo_t* info, void* c)
02705 {
02706   //notify master
02707   sem_post(sigmon_sem_);
02708 
02709   //sleep while master takes action
02710   sleep(2);
02711 
02712   //set up alarm if handler deadlocks on unsafe actions
02713   alarm(5);
02714   
02715   std::cout << "--- Slave EP signal handler caught signal " << sig << " process id is " << info->si_pid <<" ---" << std::endl;
02716   std::cout << "--- Address: " << std::hex << info->si_addr << std::dec << " --- " << std::endl;
02717   std::cout << "--- Stacktrace follows --" << std::endl;
02718   std::ostringstream stacktr;
02719   toolbox::stacktrace(20,stacktr);
02720   std::cout << stacktr.str();
02721   if (!rlimit_coresize_changed_)
02722     std::cout << "--- Dumping core." <<  " --- " << std::endl;
02723   else
02724     std::cout << "--- Core dump count exceeded on this FU. ---"<<std::endl;
02725  
02726   std::string hasdump = "";
02727   if (rlimit_coresize_changed_) hasdump = " (core dump disabled) ";
02728 
02729   LOG4CPLUS_ERROR(getApplicationLogger(),    "--- Slave EP signal handler caught signal " << sig << ". process id is " << getpid() 
02730                                           << " on node " << toolbox::net::getHostName() << " ---" << std::endl
02731                                           << "--- Address: " << std::hex << info->si_addr << std::dec << " --- " << std::endl
02732                                           << "--- Stacktrace follows"<< hasdump << " ---" << std::endl << stacktr.str()
02733                                           );
02734 
02735   //re-raise signal with default handler (will cause core dump if enabled)
02736   raise(sig);
02737 }
02738 
02739 
02740 XDAQ_INSTANTIATOR_IMPL(evf::FUEventProcessor)