CMS 3D CMS Logo

/afs/cern.ch/work/a/aaltunda/public/www/CMSSW_6_2_5/src/EventFilter/Processor/src/FUEventProcessor.cc

Go to the documentation of this file.
00001 
00002 //
00003 // FUEventProcessor
00004 // ----------------
00005 //
00007 
00008 #include "FUEventProcessor.h"
00009 #include "procUtils.h"
00010 #include "EventFilter/Utilities/interface/CPUStat.h"
00011 #include "EventFilter/Utilities/interface/RateStat.h"
00012 
00013 #include "EventFilter/Utilities/interface/Exception.h"
00014 
00015 #include "EventFilter/Message2log4cplus/interface/MLlog4cplus.h"
00016 #include "EventFilter/Modules/interface/FUShmDQMOutputService.h"
00017 #include "EventFilter/Utilities/interface/ServiceWebRegistry.h"
00018 #include "EventFilter/Utilities/interface/ServiceWeb.h"
00019 #include "EventFilter/Utilities/interface/ModuleWeb.h"
00020 #include "EventFilter/Utilities/interface/ModuleWebRegistry.h"
00021 #include "EventFilter/Modules/src/FUShmOutputModule.h"
00022 
00023 #include "FWCore/PluginManager/interface/ProblemTracker.h"
00024 #include "FWCore/PluginManager/interface/PresenceFactory.h"
00025 #include "FWCore/Utilities/interface/Presence.h"
00026 #include "FWCore/Utilities/interface/Exception.h"
00027 #include "FWCore/Version/interface/GetReleaseVersion.h"
00028 #include "FWCore/ServiceRegistry/interface/Service.h"
00029 
00030 #include "toolbox/BSem.h" 
00031 #include "toolbox/Runtime.h"
00032 #include "toolbox/stacktrace.h"
00033 #include "toolbox/net/Utils.h"
00034 
00035 #include <boost/tokenizer.hpp>
00036 
00037 #include "xcept/tools.h"
00038 #include "xgi/Method.h"
00039 
00040 #include "cgicc/CgiDefs.h"
00041 #include "cgicc/Cgicc.h"
00042 #include "cgicc/FormEntry.h"
00043 
00044 
00045 #include <sys/wait.h>
00046 #include <sys/utsname.h>
00047 #include <sys/mman.h>
00048 #include <signal.h>
00049 
00050 #include <typeinfo>
00051 #include <stdlib.h>
00052 #include <stdio.h>
00053 #include <errno.h>
00054 
00055 
00056 using namespace evf;
00057 using namespace cgicc;
00058 namespace toolbox {
00059   namespace mem {
00060     extern toolbox::BSem * _s_mutex_ptr_; 
00061   }
00062 }
00063 
00064 //signal handler (global)
00065 namespace evf {
00066   FUEventProcessor * FUInstancePtr_;
00067   int evfep_raised_signal;
00068   void evfep_sighandler(int sig, siginfo_t* info, void* c)
00069   {
00070     evfep_raised_signal=sig;
00071     FUInstancePtr_->handleSignalSlave(sig, info, c);
00072   }
00073   void evfep_alarmhandler(int sig, siginfo_t* info, void* c)
00074   {
00075     if (evfep_raised_signal) {
00076       signal(evfep_raised_signal,SIG_DFL);
00077       raise(evfep_raised_signal);
00078     }
00079   }
00080 }
00081 
00083 // construction/destruction
00085 
00086 //______________________________________________________________________________
00087 FUEventProcessor::FUEventProcessor(xdaq::ApplicationStub *s) 
00088   : xdaq::Application(s)
00089   , fsm_(this)
00090   , log_(getApplicationLogger())
00091   , evtProcessor_(log_, getApplicationDescriptor()->getInstance())
00092   , runNumber_(0)
00093   , epInitialized_(false)
00094   , outPut_(true)
00095   , autoRestartSlaves_(false)
00096   , slaveRestartDelaySecs_(10)
00097   , hasShMem_(true)
00098   , hasPrescaleService_(true)
00099   , hasModuleWebRegistry_(true)
00100   , hasServiceWebRegistry_(true)
00101   , isRunNumberSetter_(true)
00102   , iDieStatisticsGathering_(false)
00103   , outprev_(true)
00104   , exitOnError_(true)
00105   , reasonForFailedState_()
00106   , squidnet_(3128,"http://localhost:8000/RELEASE-NOTES.txt")
00107   , logRing_(logRingSize_)
00108   , logRingIndex_(logRingSize_)
00109   , logWrap_(false)
00110   , nbSubProcesses_(0)
00111   , nbSubProcessesReporting_(0)
00112   , forkInEDM_(true)
00113   , nblive_(0)
00114   , nbdead_(0)
00115   , nbTotalDQM_(0)
00116   , wlReceiving_(0)
00117   , asReceiveMsgAndExecute_(0)
00118   , receiving_(false) 
00119   , wlReceivingMonitor_(0)
00120   , asReceiveMsgAndRead_(0)
00121   , receivingM_(false)
00122   , myProcess_(0)
00123   , wlSupervising_(0)
00124   , asSupervisor_(0)
00125   , supervising_(false)
00126   , wlSignalMonitor_(0)
00127   , asSignalMonitor_(0)
00128   , signalMonitorActive_(false)
00129   , monitorInfoSpace_(0)
00130   , monitorLegendaInfoSpace_(0)
00131   , applicationInfoSpace_(0)
00132   , nbProcessed(0)
00133   , nbAccepted(0)
00134   , scalersInfoSpace_(0)
00135   , scalersLegendaInfoSpace_(0)
00136   , wlScalers_(0)
00137   , asScalers_(0)
00138   , wlScalersActive_(false)
00139   , scalersUpdates_(0)
00140   , wlSummarize_(0)
00141   , asSummarize_(0)
00142   , wlSummarizeActive_(false)
00143   , superSleepSec_(1)
00144   , iDieUrl_("none")
00145   , vulture_(0)
00146   , vp_(0)
00147   , cpustat_(0)
00148   , ratestat_(0)
00149   , mwrRef_(nullptr)
00150   , sorRef_(nullptr)
00151   , master_message_prg_(0,MSQM_MESSAGE_TYPE_PRG)
00152   , master_message_prr_(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_PRR)
00153   , slave_message_prr_(sizeof(prg),MSQS_MESSAGE_TYPE_PRR)
00154   , master_message_trr_(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_TRR)
00155   , edm_init_done_(true)
00156   , crashesThisRun_(false)
00157   , rlimit_coresize_changed_(false)
00158   , crashesToDump_(2)
00159   , sigmon_sem_(0)
00160   , datasetCounting_(true)
00161 {
00162   using namespace utils;
00163 
00164   FUInstancePtr_=this;
00165 
00166   names_.push_back("nbProcessed"    );
00167   names_.push_back("nbAccepted"     );
00168   names_.push_back("epMacroStateInt");
00169   names_.push_back("epMicroStateInt");
00170   // create pipe for web communication
00171   int retpipe = pipe(anonymousPipe_);
00172   if(retpipe != 0)
00173         LOG4CPLUS_ERROR(getApplicationLogger(),"Failed to create pipe");
00174   // check squid presence
00175   squidPresent_ = squidnet_.check();
00176   //pass application parameters to FWEPWrapper
00177   evtProcessor_.setAppDesc(getApplicationDescriptor());
00178   evtProcessor_.setAppCtxt(getApplicationContext());
00179   // bind relevant callbacks to finite state machine
00180   fsm_.initialize<evf::FUEventProcessor>(this);
00181   
00182   //set sourceId_
00183   url_ =
00184     getApplicationDescriptor()->getContextDescriptor()->getURL()+"/"+
00185     getApplicationDescriptor()->getURN();
00186   class_   =getApplicationDescriptor()->getClassName();
00187   instance_=getApplicationDescriptor()->getInstance();
00188   sourceId_=class_.toString()+"_"+instance_.toString();
00189   LOG4CPLUS_INFO(getApplicationLogger(),sourceId_ <<" constructor"         );
00190   LOG4CPLUS_INFO(getApplicationLogger(),"CMSSW_BASE:"<<getenv("CMSSW_BASE"));
00191   
00192   getApplicationDescriptor()->setAttribute("icon", "/evf/images/epicon.jpg");
00193   
00194   xdata::InfoSpace *ispace = getApplicationInfoSpace();
00195   applicationInfoSpace_ = ispace;
00196 
00197   // default configuration
00198   ispace->fireItemAvailable("parameterSet",         &configString_                );
00199   ispace->fireItemAvailable("epInitialized",        &epInitialized_               );
00200   ispace->fireItemAvailable("stateName",             fsm_.stateName()             );
00201   ispace->fireItemAvailable("runNumber",            &runNumber_                   );
00202   ispace->fireItemAvailable("outputEnabled",        &outPut_                      );
00203 
00204   ispace->fireItemAvailable("hasSharedMemory",      &hasShMem_);
00205   ispace->fireItemAvailable("hasPrescaleService",   &hasPrescaleService_          );
00206   ispace->fireItemAvailable("hasModuleWebRegistry", &hasModuleWebRegistry_        );
00207   ispace->fireItemAvailable("hasServiceWebRegistry", &hasServiceWebRegistry_      );
00208   ispace->fireItemAvailable("isRunNumberSetter",    &isRunNumberSetter_           );
00209   ispace->fireItemAvailable("iDieStatisticsGathering",   &iDieStatisticsGathering_);
00210   ispace->fireItemAvailable("rcmsStateListener",     fsm_.rcmsStateListener()     );
00211   ispace->fireItemAvailable("foundRcmsStateListener",fsm_.foundRcmsStateListener());
00212   ispace->fireItemAvailable("nbSubProcesses",       &nbSubProcesses_              );
00213   ispace->fireItemAvailable("nbSubProcessesReporting",&nbSubProcessesReporting_   );
00214   ispace->fireItemAvailable("forkInEDM"             ,&forkInEDM_                  );
00215   ispace->fireItemAvailable("superSleepSec",        &superSleepSec_               );
00216   ispace->fireItemAvailable("autoRestartSlaves",    &autoRestartSlaves_           );
00217   ispace->fireItemAvailable("slaveRestartDelaySecs",&slaveRestartDelaySecs_       );
00218   ispace->fireItemAvailable("iDieUrl",              &iDieUrl_                     );
00219   ispace->fireItemAvailable("crashesToDump"         ,&crashesToDump_              );
00220   ispace->fireItemAvailable("datasetCounting"       ,&datasetCounting_            );
00221 
00222   // Add infospace listeners for exporting data values
00223   getApplicationInfoSpace()->addItemChangedListener("parameterSet",        this);
00224   getApplicationInfoSpace()->addItemChangedListener("outputEnabled",       this);
00225 
00226   // findRcmsStateListener
00227   fsm_.findRcmsStateListener();
00228   
00229   // initialize monitoring infospace
00230 
00231   std::string monInfoSpaceName="evf-eventprocessor-status-monitor";
00232   toolbox::net::URN urn = this->createQualifiedInfoSpace(monInfoSpaceName);
00233   monitorInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
00234 
00235   std::string monLegendaInfoSpaceName="evf-eventprocessor-status-legenda";
00236   urn = this->createQualifiedInfoSpace(monLegendaInfoSpaceName);
00237   monitorLegendaInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
00238 
00239   
00240   monitorInfoSpace_->fireItemAvailable("url",                      &url_            );
00241   monitorInfoSpace_->fireItemAvailable("class",                    &class_          );
00242   monitorInfoSpace_->fireItemAvailable("instance",                 &instance_       );
00243   monitorInfoSpace_->fireItemAvailable("runNumber",                &runNumber_      );
00244   monitorInfoSpace_->fireItemAvailable("stateName",                 fsm_.stateName()); 
00245 
00246   monitorInfoSpace_->fireItemAvailable("squidPresent",             &squidPresent_   );
00247 
00248   std::string scalersInfoSpaceName="evf-eventprocessor-scalers-monitor";
00249   urn = this->createQualifiedInfoSpace(scalersInfoSpaceName);
00250   scalersInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
00251 
00252   std::string scalersLegendaInfoSpaceName="evf-eventprocessor-scalers-legenda";
00253   urn = this->createQualifiedInfoSpace(scalersLegendaInfoSpaceName);
00254   scalersLegendaInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
00255 
00256 
00257 
00258   evtProcessor_.setScalersInfoSpace(scalersInfoSpace_,scalersLegendaInfoSpace_);
00259   scalersInfoSpace_->fireItemAvailable("instance", &instance_);
00260 
00261   evtProcessor_.setApplicationInfoSpace(ispace);
00262   evtProcessor_.setMonitorInfoSpace(monitorInfoSpace_,monitorLegendaInfoSpace_);
00263   evtProcessor_.publishConfigAndMonitorItems(nbSubProcesses_.value_!=0);
00264 
00265   //subprocess state vectors for MP
00266   monitorInfoSpace_->fireItemAvailable("epMacroStateInt",             &spMStates_); 
00267   monitorInfoSpace_->fireItemAvailable("epMicroStateInt",             &spmStates_); 
00268   
00269   // Bind web interface
00270   xgi::bind(this, &FUEventProcessor::css,              "styles.css");
00271   xgi::bind(this, &FUEventProcessor::defaultWebPage,   "Default"   );
00272   xgi::bind(this, &FUEventProcessor::spotlightWebPage, "Spotlight" );
00273   xgi::bind(this, &FUEventProcessor::scalersWeb,       "scalersWeb");
00274   xgi::bind(this, &FUEventProcessor::pathNames,        "pathNames" );
00275   xgi::bind(this, &FUEventProcessor::subWeb,           "SubWeb"    );
00276   xgi::bind(this, &FUEventProcessor::getSlavePids,     "getSlavePids");
00277   xgi::bind(this, &FUEventProcessor::moduleWeb,        "moduleWeb" );
00278   xgi::bind(this, &FUEventProcessor::serviceWeb,       "serviceWeb");
00279   xgi::bind(this, &FUEventProcessor::microState,       "microState");
00280   xgi::bind(this, &FUEventProcessor::updater,          "updater"   );
00281   xgi::bind(this, &FUEventProcessor::procStat,         "procStat"  );
00282 
00283   // instantiate the plugin manager, not referenced here after!
00284 
00285   edm::AssertHandler ah;
00286 
00287   //create Message Service thread and pass ownership to auto_ptr that we destroy before fork
00288   try{
00289     LOG4CPLUS_DEBUG(getApplicationLogger(),
00290                     "Trying to create message service presence ");
00291     edm::PresenceFactory *pf = edm::PresenceFactory::get();
00292     if(pf != 0) {
00293       messageServicePresence_= pf->makePresence("MessageServicePresence");
00294     }
00295     else {
00296       LOG4CPLUS_ERROR(getApplicationLogger(),
00297                       "Unable to create message service presence ");
00298     }
00299   } 
00300   catch(...) {
00301     LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception");
00302   }
00303   ML::MLlog4cplus::setAppl(this);      
00304 
00305   typedef std::set<xdaq::ApplicationDescriptor*> AppDescSet_t;
00306   typedef AppDescSet_t::iterator                 AppDescIter_t;
00307   
00308   AppDescSet_t rcms=
00309     getApplicationContext()->getDefaultZone()->
00310     getApplicationDescriptors("RCMSStateListener");
00311   if(rcms.size()==0) 
00312     {
00313       LOG4CPLUS_WARN(getApplicationLogger(),
00314                        "MonitorReceiver not found, perhaphs it has not been defined ? Scalers updater wl will bail out!");
00315       //        localLog("-W- MonitorReceiver not found, perhaphs it has not been defined ? Scalers updater wl will bail out!");
00316     }
00317   else
00318     {
00319       AppDescIter_t it = rcms.begin();
00320       evtProcessor_.setRcms(*it);
00321     }
00322   pthread_mutex_init(&start_lock_,0);
00323   pthread_mutex_init(&stop_lock_,0);
00324   pthread_mutex_init(&pickup_lock_,0);
00325 
00326   forkInfoObj_=nullptr;
00327   pthread_mutex_init(&forkObjLock_,0);
00328   makeStaticInfo();
00329   startSupervisorLoop();  
00330 
00331   if(vulture_==0) vulture_ = new Vulture(true);
00332 
00334 
00335   AppDescSet_t setOfiDie=
00336     getApplicationContext()->getDefaultZone()->
00337     getApplicationDescriptors("evf::iDie");
00338   
00339   for (AppDescIter_t it=setOfiDie.begin();it!=setOfiDie.end();++it)
00340     if ((*it)->getInstance()==0) // there has to be only one instance of iDie
00341       iDieUrl_ = (*it)->getContextDescriptor()->getURL() + "/" + (*it)->getURN();
00342 
00343   //save default core file size
00344   getrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
00345 
00346   //prepare IPC semaphore for getting the workloop waked up on signal caught in slaves
00347   #ifdef linux
00348   if (sigmon_sem_==0) {
00349     sigmon_sem_ = (sem_t*)mmap(NULL, sizeof(sem_t),
00350         PROT_READ | PROT_WRITE,
00351         MAP_ANONYMOUS | MAP_SHARED, 0, 0);
00352     if (!sigmon_sem_) {
00353       perror("mmap error\n");
00354       std::cout << "mmap error"<<std::endl;
00355     }
00356     else
00357       sem_init(sigmon_sem_,true,0);
00358   }
00359   #endif
00360 }
00361 //___________here ends the *huge* constructor___________________________________
00362 
00363 
00364 //______________________________________________________________________________
00365 FUEventProcessor::~FUEventProcessor()
00366 {
00367   // no longer needed since the wrapper is a member of the class and one can rely on 
00368   // implicit destructor - to be revised - at any rate the most common exit path is via "kill"...
00369   //  if (evtProcessor_) delete evtProcessor_;
00370   if(vulture_ != 0) delete vulture_;
00371 }
00372 
00373 
00375 // implementation of member functions
00377 
00378 
00379 //______________________________________________________________________________
00380 bool FUEventProcessor::configuring(toolbox::task::WorkLoop* wl)
00381 {
00382 //   std::cout << "values " << ((nbSubProcesses_.value_!=0) ? 0x10 : 0) << " "
00383 //          << ((instance_.value_==0) ? 0x8 : 0) << " "
00384 //          << (hasServiceWebRegistry_.value_ ? 0x4 : 0) << " "
00385 //          << (hasModuleWebRegistry_.value_ ? 0x2 : 0) << " "
00386 //          << (hasPrescaleService_.value_ ? 0x1 : 0) <<std::endl;
00387   unsigned short smap 
00388     = (datasetCounting_.value_ ? 0x20 : 0 )
00389     + ((nbSubProcesses_.value_!=0) ? 0x10 : 0)
00390     + (((instance_.value_%80)==0) ? 0x8 : 0) // have at least one legend per slice
00391     + (hasServiceWebRegistry_.value_ ? 0x4 : 0) 
00392     + (hasModuleWebRegistry_.value_ ? 0x2 : 0) 
00393     + (hasPrescaleService_.value_ ? 0x1 : 0);
00394   if(nbSubProcesses_.value_==0) 
00395     {
00396       spMStates_.setSize(1); 
00397       spmStates_.setSize(1); 
00398     }
00399   else
00400     {
00401       spMStates_.setSize(nbSubProcesses_.value_);
00402       spmStates_.setSize(nbSubProcesses_.value_);
00403       for(unsigned int i = 0; i < spMStates_.size(); i++)
00404         {
00405           spMStates_[i] = edm::event_processor::sInit; 
00406           spmStates_[i] = 0; 
00407         }
00408     }
00409   try {
00410     LOG4CPLUS_INFO(getApplicationLogger(),"Start configuring ...");
00411     std::string cfg = configString_.toString(); evtProcessor_.init(smap,cfg);
00412     epInitialized_=true;
00413     if(evtProcessor_)
00414       {
00415         //get ref of mwr
00416         mwrRef_ = evtProcessor_.getModuleWebRegistry();
00417         sorRef_ = evtProcessor_.getShmOutputModuleRegistry();
00418         // moved to wrapper class
00419         configuration_ = evtProcessor_.configuration();
00420         if(nbSubProcesses_.value_==0) evtProcessor_.startMonitoringWorkLoop(); 
00421         evtProcessor_->beginJob();
00422         evtProcessor_.setupFastTimerService(nbSubProcesses_.value_>0 ? nbSubProcesses_.value_:1);
00423         if(cpustat_) {delete cpustat_; cpustat_=0;}
00424         cpustat_ = new CPUStat(evtProcessor_.getNumberOfMicrostates(),
00425                                nbSubProcesses_.value_,
00426                                instance_.value_,
00427                                iDieUrl_.value_);
00428         if(ratestat_) {delete ratestat_; ratestat_=0;}
00429         ratestat_ = new RateStat(iDieUrl_.value_);
00430         if(iDieStatisticsGathering_.value_)
00431         {
00432           try
00433           {
00434             cpustat_->sendLegenda(evtProcessor_.getmicromap());
00435             xdata::Serializable *legenda = scalersLegendaInfoSpace_->find("scalersLegenda");
00436             if(legenda !=0)
00437             {
00438               std::string slegenda = ((xdata::String*)legenda)->value_;
00439               ratestat_->sendLegenda(slegenda);
00440             }
00441             if (sorRef_ && datasetCounting_.value_)
00442             {
00443               xdata::String dsLegenda =  sorRef_->getDatasetCSV();
00444               if (dsLegenda.value_.size())
00445                 ratestat_->sendAuxLegenda(dsLegenda);
00446             }
00447           }
00448           catch(evf::Exception &e)
00449           {
00450             LOG4CPLUS_INFO(getApplicationLogger(),"coud not send legenda"
00451                 << e.what());
00452           }
00453           catch (xcept::Exception& e) {
00454             LOG4CPLUS_ERROR(getApplicationLogger(),"Failed to get or send legenda."
00455                 << e.what());
00456           }
00457         }
00458 
00459         fsm_.fireEvent("ConfigureDone",this);
00460         LOG4CPLUS_INFO(getApplicationLogger(),"Finished configuring!");
00461         localLog("-I- Configuration completed");
00462 
00463       }
00464   }
00465   catch (xcept::Exception &e) {
00466     reasonForFailedState_ = "configuring FAILED: " + (std::string)e.what();
00467     fsm_.fireFailed(reasonForFailedState_,this);
00468     localLog(reasonForFailedState_);
00469   }
00470   catch(cms::Exception &e) {
00471     reasonForFailedState_ = e.explainSelf();
00472     fsm_.fireFailed(reasonForFailedState_,this);
00473     localLog(reasonForFailedState_);
00474   }    
00475   catch(std::exception &e) {
00476     reasonForFailedState_ = e.what();
00477     fsm_.fireFailed(reasonForFailedState_,this);
00478     localLog(reasonForFailedState_);
00479   }
00480   catch(...) {
00481     fsm_.fireFailed("Unknown Exception",this);
00482   }
00483 
00484   if(vulture_!=0 && vp_ == 0) vp_ = vulture_->makeProcess();
00485 
00486   return false;
00487 }
00488 
00489 
00490 
00491 
00492 //______________________________________________________________________________
00493 bool FUEventProcessor::enabling(toolbox::task::WorkLoop* wl)
00494 {
00495   nbTotalDQM_ = 0;
00496   scalersUpdates_ = 0;
00497   idleProcStats_ = 0;
00498   allProcStats_ = 0;
00499 //   std::cout << "values " << ((nbSubProcesses_.value_!=0) ? 0x10 : 0) << " "
00500 //          << ((instance_.value_==0) ? 0x8 : 0) << " "
00501 //          << (hasServiceWebRegistry_.value_ ? 0x4 : 0) << " "
00502 //          << (hasModuleWebRegistry_.value_ ? 0x2 : 0) << " "
00503 //          << (hasPrescaleService_.value_ ? 0x1 : 0) <<std::endl;
00504   unsigned short smap 
00505     = (datasetCounting_.value_ ? 0x20 : 0 )
00506     + ((nbSubProcesses_.value_!=0) ? 0x10 : 0)
00507     + (((instance_.value_%80)==0) ? 0x8 : 0) // have at least one legend per slice
00508     + (hasServiceWebRegistry_.value_ ? 0x4 : 0) 
00509     + (hasModuleWebRegistry_.value_ ? 0x2 : 0) 
00510     + (hasPrescaleService_.value_ ? 0x1 : 0);
00511 
00512   LOG4CPLUS_INFO(getApplicationLogger(),"Start enabling...");
00513   
00514   //reset core limit size
00515   if (rlimit_coresize_changed_)
00516     setrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
00517   rlimit_coresize_changed_=false;
00518   crashesThisRun_=0;
00519   //recreate signal monitor sem
00520   sem_destroy(sigmon_sem_);
00521   sem_init(sigmon_sem_,true,0);
00522 
00523   if(!epInitialized_){
00524     evtProcessor_.forceInitEventProcessorMaybe();
00525   }
00526   std::string cfg = configString_.toString(); evtProcessor_.init(smap,cfg);
00527   
00528   mwrRef_ = evtProcessor_.getModuleWebRegistry();
00529   sorRef_ = evtProcessor_.getShmOutputModuleRegistry();
00530 
00531   if(!epInitialized_){
00532     evtProcessor_->beginJob(); 
00533     evtProcessor_.setupFastTimerService(nbSubProcesses_.value_>0 ? nbSubProcesses_.value_:1);
00534     if(cpustat_) {delete cpustat_; cpustat_=0;}
00535     cpustat_ = new CPUStat(evtProcessor_.getNumberOfMicrostates(),
00536                            nbSubProcesses_.value_,
00537                            instance_.value_,
00538                            iDieUrl_.value_);
00539     if(ratestat_) {delete ratestat_; ratestat_=0;}
00540     ratestat_ = new RateStat(iDieUrl_.value_);
00541     if(iDieStatisticsGathering_.value_)
00542     {
00543       try
00544       {
00545         cpustat_->sendLegenda(evtProcessor_.getmicromap());
00546         xdata::Serializable *legenda = scalersLegendaInfoSpace_->find("scalersLegenda");
00547         if(legenda !=0)
00548         {
00549           std::string slegenda = ((xdata::String*)legenda)->value_;
00550           ratestat_->sendLegenda(slegenda);
00551         }
00552         if (sorRef_ && datasetCounting_.value_)
00553         {
00554           xdata::String dsLegenda =  sorRef_->getDatasetCSV();
00555           if (dsLegenda.value_.size())
00556             ratestat_->sendAuxLegenda(dsLegenda);
00557         }
00558       }
00559       catch(evf::Exception &e)
00560       {
00561         LOG4CPLUS_INFO(getApplicationLogger(),"could not send legenda"
00562             << e.what());
00563       }
00564       catch (xcept::Exception& e) {
00565         LOG4CPLUS_ERROR(getApplicationLogger(),"Failed to get or send legenda."
00566             << e.what());
00567       }
00568     }
00569     epInitialized_ = true;
00570   }
00571   configuration_ = evtProcessor_.configuration(); // get it again after init has been carried out...
00572   evtProcessor_.resetLumiSectionReferenceIndex();
00573   //classic appl will return here 
00574   if(nbSubProcesses_.value_==0) return enableClassic();
00575   //protect manipulation of subprocess array
00576   pthread_mutex_lock(&start_lock_);
00577   pthread_mutex_lock(&pickup_lock_);
00578   subs_.clear();
00579   subs_.resize(nbSubProcesses_.value_); // this should not be necessary
00580   pid_t retval = -1;
00581 
00582   for(unsigned int i=0; i<nbSubProcesses_.value_; i++)
00583     {
00584       subs_[i]=SubProcess(i,retval); //this will replace all the scattered variables
00585     }
00586   pthread_mutex_unlock(&pickup_lock_);
00587 
00588   pthread_mutex_unlock(&start_lock_);
00589 
00590   //set expected number of child EP's for the Init message(s) sent to the SM
00591   try {
00592     if (sorRef_) {
00593       unsigned int nbExpectedEPWriters = nbSubProcesses_.value_;
00594       if (nbExpectedEPWriters==0) nbExpectedEPWriters=1;//master instance processing
00595       std::vector<edm::FUShmOutputModule *> shmOutputs = sorRef_->getShmOutputModules();
00596       for (unsigned int i=0;i<shmOutputs.size();i++) {
00597         shmOutputs[i]->setNExpectedEPs(nbExpectedEPWriters);
00598       }
00599     }
00600   }
00601   catch (...)
00602   {
00603     LOG4CPLUS_ERROR(getApplicationLogger(),"Thrown Exception while setting nExpectedEPs in shmOutputs");
00604   }
00605 
00606   //use new method if configured
00607   edm_init_done_=true;
00608   if (forkInEDM_.value_) {
00609     edm_init_done_=false;
00610     enableForkInEDM();
00611   }
00612   else
00613     for(unsigned int i=0; i<nbSubProcesses_.value_; i++)
00614     {
00615       retval = subs_[i].forkNew();
00616       if(retval==0)
00617         {
00618           myProcess_ = &subs_[i];
00619           // dirty hack: delete/recreate global binary semaphore for later use in child
00620           delete toolbox::mem::_s_mutex_ptr_;
00621           toolbox::mem::_s_mutex_ptr_ = new toolbox::BSem(toolbox::BSem::FULL,true);
00622           int retval = pthread_mutex_destroy(&stop_lock_);
00623           if(retval != 0) perror("error");
00624           retval = pthread_mutex_init(&stop_lock_,0);
00625           if(retval != 0) perror("error");
00626           fsm_.disableRcmsStateNotification();
00627           
00628           return enableMPEPSlave();
00629           // the loop is broken in the child 
00630         }
00631     }
00632 
00633   if (forkInEDM_.value_) {
00634 
00635     edm::event_processor::State st;
00636     while (!edm_init_done_) {
00637       usleep(10000);
00638       st = evtProcessor_->getState();
00639       if (st==edm::event_processor::sError || st==edm::event_processor::sInvalid) break;
00640     }
00641     st = evtProcessor_->getState();
00642     //error handling: EP must fork during sRunning
00643     if (st!=edm::event_processor::sRunning) {
00644       reasonForFailedState_ = std::string("Master edm::EventProcessor in state ") + evtProcessor_->stateName(st);
00645       localLog(reasonForFailedState_);
00646       fsm_.fireFailed(reasonForFailedState_,this);
00647       return false;
00648     }
00649 
00650     sleep(1);
00651     startSummarizeWorkLoop();
00652     startSignalMonitorWorkLoop();//only with new forking
00653     vp_ = vulture_->start(iDieUrl_.value_,runNumber_.value_);
00654 
00655     //enable after we are done with conditions loading and forking
00656     LOG4CPLUS_INFO(getApplicationLogger(),"Finished enabling!");
00657     fsm_.fireEvent("EnableDone",this);
00658     localLog("-I- Start completed");
00659     return false;
00660   }
00661 
00662   startSummarizeWorkLoop();
00663   vp_ = vulture_->start(iDieUrl_.value_,runNumber_.value_);
00664   LOG4CPLUS_INFO(getApplicationLogger(),"Finished enabling!");
00665   fsm_.fireEvent("EnableDone",this);
00666   localLog("-I- Start completed");
00667   return false;
00668 }
00669 
00670 bool FUEventProcessor::doEndRunInEDM() {
00671   //taking care that EP in master is in stoppable state
00672   if (forkInfoObj_) {
00673 
00674     int count = 30;
00675     bool waitedForEDM=false;
00676     while (!edm_init_done_ && count) {
00677       ::sleep(1);
00678       if (count%5==0)
00679         LOG4CPLUS_WARN(log_,"MASTER EP: Stopping while EP busy in beginRun. waiting " <<count<< "sec");
00680       count--;
00681       waitedForEDM=true;
00682     }
00683     //sleep a few more seconds it was early stop
00684     if (waitedForEDM) sleep(5);
00685 
00686     //if (count==0) fsm_.fireFailed("failed to stop Master EP",this);
00687 
00688     if (evtProcessor_->getState()==edm::event_processor::sJobReady)
00689       return true;//we are already done
00690 
00691     forkInfoObj_->lock();
00692     forkInfoObj_->stopCondition=true;
00693     sem_post(forkInfoObj_->control_sem_);
00694     forkInfoObj_->unlock();
00695 
00696     count = 30;
00697 
00698     edm::event_processor::State st;
00699     while(count--) {
00700       st = evtProcessor_->getState();
00701       if (st==edm::event_processor::sRunning) ::usleep(100000);
00702       else if (st==edm::event_processor::sStopping || st==edm::event_processor::sJobReady) {
00703         break;
00704       }
00705       else {
00706         std::ostringstream ost;
00707         ost << "Master edm::EventProcessor is in state "<< evtProcessor_->stateName(st) << " while stopping";
00708         LOG4CPLUS_ERROR(getApplicationLogger(),ost.str());
00709         fsm_.fireFailed(ost.str(),this);
00710         return false;
00711       }
00712       if (count%5==0 && st==edm::event_processor::sRunning && !forkInfoObj_->receivedStop_) {
00713         forkInfoObj_->lock();
00714         forkInfoObj_->stopCondition=true;
00715         sem_post(forkInfoObj_->control_sem_);
00716         forkInfoObj_->unlock();
00717         LOG4CPLUS_WARN(getApplicationLogger(),
00718           "Master edm::EventProcessor still running after "<< (30-count-1) << " seconds. \"sem_post\" was executed again" );
00719       }
00720     }
00721     if (count<0) {
00722       std::ostringstream ost;
00723       if (!forkInfoObj_->receivedStop_)
00724         ost << "Timeout waiting for Master edm::EventProcessor to go stopping state "
00725             << evtProcessor_->stateName(st) << ": input source did not receive stop signal!";
00726       else
00727         ost << "Timeout waiting for Master edm::EventProcessor to go stopping state "<<evtProcessor_->stateName(st);
00728       LOG4CPLUS_ERROR(getApplicationLogger(),ost.str());
00729       fsm_.fireFailed(ost.str(),this);
00730       return false;
00731     }
00732   }
00733   return true;
00734 }
00735 
00736 //______________________________________________________________________________
00737 bool FUEventProcessor::stopping(toolbox::task::WorkLoop* wl)
00738 {
00739   setrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
00740   if(nbSubProcesses_.value_!=0) {
00741     stopSlavesAndAcknowledge();
00742     if (forkInEDM_.value_) {
00743             //only in new forking for now
00744             sem_post(sigmon_sem_);
00745             if (!doEndRunInEDM())
00746               return false;
00747     }
00748   }
00749   vulture_->stop();
00750 
00751   if (forkInEDM_.value_) {
00752     //shared memory was already disconnected in master
00753     bool tmpHasShMem_=hasShMem_;
00754     hasShMem_=false;
00755     stopClassic();
00756     hasShMem_=tmpHasShMem_;
00757     return false;
00758   }
00759   stopClassic();
00760   return false;
00761 }
00762 
00763 
00764 //______________________________________________________________________________
00765 bool FUEventProcessor::halting(toolbox::task::WorkLoop* wl)
00766 {
00767   LOG4CPLUS_INFO(getApplicationLogger(),"Start halting ...");
00768   setrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
00769   if(nbSubProcesses_.value_!=0) { 
00770     stopSlavesAndAcknowledge();
00771     if (forkInEDM_.value_) {
00772             sem_post(sigmon_sem_);
00773             if (!doEndRunInEDM())
00774               return false;
00775 
00776     }
00777   }
00778   try{
00779     evtProcessor_.stopAndHalt();
00780     //cleanup forking variables
00781     if (forkInfoObj_) {
00782       delete forkInfoObj_;
00783       forkInfoObj_=0;
00784     }
00785   }
00786   catch (evf::Exception &e) {
00787     reasonForFailedState_ = "halting FAILED: " + (std::string)e.what();
00788     localLog(reasonForFailedState_);
00789     fsm_.fireFailed(reasonForFailedState_,this);
00790   }
00791   //  if(hasShMem_) detachDqmFromShm();
00792 
00793   LOG4CPLUS_INFO(getApplicationLogger(),"Finished halting!");
00794   fsm_.fireEvent("HaltDone",this);
00795 
00796   localLog("-I- Halt completed");
00797   return false;
00798 }
00799 
00800 
00801 //______________________________________________________________________________
00802 xoap::MessageReference FUEventProcessor::fsmCallback(xoap::MessageReference msg)
00803   throw (xoap::exception::Exception)
00804 {
00805   return fsm_.commandCallback(msg);
00806 }
00807 
00808 
00809 //______________________________________________________________________________
00810 void FUEventProcessor::actionPerformed(xdata::Event& e)
00811 {
00812 
00813   if (e.type()=="ItemChangedEvent" && fsm_.stateName()->toString()!="Halted") {
00814     std::string item = dynamic_cast<xdata::ItemChangedEvent&>(e).itemName();
00815     
00816     if ( item == "parameterSet") {
00817       LOG4CPLUS_WARN(getApplicationLogger(),
00818                      "HLT Menu changed, force reinitialization of EventProcessor");
00819       epInitialized_ = false;
00820     }
00821     if ( item == "outputEnabled") {
00822       if(outprev_ != outPut_) {
00823         LOG4CPLUS_WARN(getApplicationLogger(),
00824                        (outprev_ ? "Disabling " : "Enabling ")<<"global output");
00825         evtProcessor_->enableEndPaths(outPut_);
00826         outprev_ = outPut_;
00827       }
00828     }
00829     if (item == "globalInputPrescale") {
00830       LOG4CPLUS_WARN(getApplicationLogger(),
00831                      "Setting global input prescale has no effect "
00832                      <<"in this version of the code");
00833     }
00834     if ( item == "globalOutputPrescale") {
00835       LOG4CPLUS_WARN(getApplicationLogger(),
00836                      "Setting global output prescale has no effect "
00837                      <<"in this version of the code");
00838     }
00839   }
00840   
00841 }
00842 
00843 //______________________________________________________________________________
00844 void FUEventProcessor::getSlavePids(xgi::Input  *in, xgi::Output *out) throw (xgi::exception::Exception)
00845 {
00846   for (unsigned int i=0;i<subs_.size();i++)
00847   {
00848     if (i!=0) *out << ",";
00849     *out << subs_[i].pid();
00850   }
00851 }
00852 //______________________________________________________________________________
00853 void FUEventProcessor::subWeb(xgi::Input  *in, xgi::Output *out) throw (xgi::exception::Exception)
00854 {
00855   using namespace cgicc;
00856   pid_t pid = 0;
00857   std::ostringstream ost;
00858   ost << "&";
00859 
00860   Cgicc cgi(in);
00861   internal::MyCgi *mycgi = (internal::MyCgi*)in;
00862   for(std::map<std::string, std::string, std::less<std::string> >::iterator mit = 
00863         mycgi->getEnvironment().begin();
00864       mit != mycgi->getEnvironment().end(); mit++)
00865     ost << mit->first << "%" << mit->second << ";";
00866   std::vector<FormEntry> els = cgi.getElements() ;
00867   std::vector<FormEntry> el1;
00868   cgi.getElement("method",el1);
00869   std::vector<FormEntry> el2;
00870   cgi.getElement("process",el2);
00871   if(el1.size()!=0) {
00872     std::string meth = el1[0].getValue();
00873     if(el2.size()!=0) {
00874       unsigned int i = 0;
00875       std::string mod = el2[0].getValue();
00876       pid = atoi(mod.c_str()); // get the process id to be polled
00877       for(; i < subs_.size(); i++)
00878         if(subs_[i].pid()==pid) break;
00879       if(i>=subs_.size()){ //process was not found, let the browser know
00880         *out << "ERROR 404 : Process " << pid << " Not Found !" << std::endl;
00881         return;
00882       } 
00883       if(subs_[i].alive() != 1){
00884         *out << "ERROR 405 : Process " << pid << " Not Alive !" << std::endl;
00885         return;
00886       }
00887       MsgBuf msg1(meth.length()+ost.str().length()+1,MSQM_MESSAGE_TYPE_WEB);
00888       strncpy(msg1->mtext,meth.c_str(),meth.length());
00889       strncpy(msg1->mtext+meth.length(),ost.str().c_str(),ost.str().length());
00890       subs_[i].post(msg1,true);
00891       unsigned int keep_supersleep_original_value = superSleepSec_.value_;
00892       superSleepSec_.value_=10*keep_supersleep_original_value;
00893       MsgBuf msg(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_WEB);
00894       bool done = false;
00895       std::vector<char *>pieces;
00896       while(!done){
00897         unsigned long retval1 = subs_[i].rcvNonBlocking(msg,true);
00898         if(retval1 == MSGQ_MESSAGE_TYPE_RANGE){
00899 	  ::sleep(1);
00900           continue;
00901         }
00902         unsigned int nbytes = atoi(msg->mtext);
00903         if(nbytes < MAX_PIPE_BUFFER_SIZE) done = true; // this will break the while loop
00904         char *buf= new char[nbytes];
00905         ssize_t retval = read(anonymousPipe_[PIPE_READ],buf,nbytes);
00906         if(retval<0){
00907           std::cout << "Failed to read from pipe." << std::endl;
00908           continue;
00909         }
00910         if(static_cast<unsigned int>(retval) != nbytes) std::cout 
00911           << "CAREFUL HERE, read less bytes than expected from pipe in subWeb" << std::endl;
00912         pieces.push_back(buf);
00913       }
00914       superSleepSec_.value_=keep_supersleep_original_value;
00915       for(unsigned int j = 0; j < pieces.size(); j++){
00916         *out<<pieces[j];    // chain the buffers into the output strstream
00917         delete[] pieces[j]; //make sure to release all buffers used for reading the pipe
00918       }
00919     }
00920   }
00921 }
00922 
00923 
00924 //______________________________________________________________________________
00925 void FUEventProcessor::defaultWebPage(xgi::Input  *in, xgi::Output *out)
00926   throw (xgi::exception::Exception)
00927 {
00928 
00929 
00930   *out << "<!DOCTYPE HTML PUBLIC \"-//W3C//DTD HTML 4.0 Transitional//EN\">" 
00931        << "<html><head><title>" << getApplicationDescriptor()->getClassName() << (nbSubProcesses_.value_ > 0 ? "MP " : " ") 
00932        << getApplicationDescriptor()->getInstance() << "</title>"
00933        << "<meta http-equiv=\"REFRESH\" content=\"0;url=/evf/html/defaultBasePage.html\">"
00934        << "</head></html>";
00935 }
00936 
00937 
00938 //______________________________________________________________________________
00939 
00940 
00941 void FUEventProcessor::spotlightWebPage(xgi::Input  *in, xgi::Output *out)
00942   throw (xgi::exception::Exception)
00943 {
00944 
00945   std::string urn = getApplicationDescriptor()->getURN();
00946 
00947   *out << "<!-- base href=\"/" <<  urn
00948        << "\"> -->" << std::endl;
00949   *out << "<html>"                                                   << std::endl;
00950   *out << "<head>"                                                   << std::endl;
00951   *out << "<link type=\"text/css\" rel=\"stylesheet\"";
00952   *out << " href=\"/evf/html/styles.css\"/>"                   << std::endl;
00953   *out << "<title>" << getApplicationDescriptor()->getClassName() 
00954        << getApplicationDescriptor()->getInstance() 
00955        << " MAIN</title>"     << std::endl;
00956   *out << "</head>"                                                  << std::endl;
00957   *out << "<body>"                                                   << std::endl;
00958   *out << "<table border=\"0\" width=\"100%\">"                      << std::endl;
00959   *out << "<tr>"                                                     << std::endl;
00960   *out << "  <td align=\"left\">"                                    << std::endl;
00961   *out << "    <img"                                                 << std::endl;
00962   *out << "     align=\"middle\""                                    << std::endl;
00963   *out << "     src=\"/evf/images/spoticon.jpg\""                            << std::endl;
00964   *out << "     alt=\"main\""                                        << std::endl;
00965   *out << "     width=\"64\""                                        << std::endl;
00966   *out << "     height=\"64\""                                       << std::endl;
00967   *out << "     border=\"\"/>"                                       << std::endl;
00968   *out << "    <b>"                                                  << std::endl;
00969   *out << getApplicationDescriptor()->getClassName() 
00970        << getApplicationDescriptor()->getInstance()                  << std::endl;
00971   *out << "      " << fsm_.stateName()->toString()                   << std::endl;
00972   *out << "    </b>"                                                 << std::endl;
00973   *out << "  </td>"                                                  << std::endl;
00974   *out << "  <td width=\"32\">"                                      << std::endl;
00975   *out << "    <a href=\"/urn:xdaq-application:lid=3\">"             << std::endl;
00976   *out << "      <img"                                               << std::endl;
00977   *out << "       align=\"middle\""                                  << std::endl;
00978   *out << "       src=\"/hyperdaq/images/HyperDAQ.jpg\""             << std::endl;
00979   *out << "       alt=\"HyperDAQ\""                                  << std::endl;
00980   *out << "       width=\"32\""                                      << std::endl;
00981   *out << "       height=\"32\""                                     << std::endl;
00982   *out << "       border=\"\"/>"                                     << std::endl;
00983   *out << "    </a>"                                                 << std::endl;
00984   *out << "  </td>"                                                  << std::endl;
00985   *out << "  <td width=\"32\">"                                      << std::endl;
00986   *out << "  </td>"                                                  << std::endl;
00987   *out << "  <td width=\"32\">"                                      << std::endl;
00988   *out << "    <a href=\"/" << urn << "/\">"                         << std::endl;
00989   *out << "      <img"                                               << std::endl;
00990   *out << "       align=\"middle\""                                  << std::endl;
00991   *out << "       src=\"/evf/images/epicon.jpg\""                    << std::endl;
00992   *out << "       alt=\"main\""                                      << std::endl;
00993   *out << "       width=\"32\""                                      << std::endl;
00994   *out << "       height=\"32\""                                     << std::endl;
00995   *out << "       border=\"\"/>"                                     << std::endl;
00996   *out << "    </a>"                                                 << std::endl;
00997   *out << "  </td>"                                                  << std::endl;
00998   *out << "</tr>"                                                    << std::endl;
00999   *out << "</table>"                                                 << std::endl;
01000 
01001   *out << "<hr/>"                                                    << std::endl;
01002   
01003   std::ostringstream ost;
01004   if(myProcess_) 
01005     ost << "/SubWeb?process=" << getpid() << "&method=moduleWeb&";
01006   else
01007     ost << "/moduleWeb?";
01008   urn += ost.str();
01009   if(evtProcessor_ && (myProcess_ || nbSubProcesses_.value_==0))
01010     evtProcessor_.taskWebPage(in,out,urn);
01011   else if(evtProcessor_)
01012     evtProcessor_.summaryWebPage(in,out,urn);
01013   else
01014     *out << "<td>HLT Unconfigured</td>" << std::endl;
01015   *out << "</table>"                                                 << std::endl;
01016   
01017   *out << "<br><textarea rows=" << 10 << " cols=80 scroll=yes>"      << std::endl;
01018   *out << configuration_                                             << std::endl;
01019   *out << "</textarea><P>"                                           << std::endl;
01020   
01021   *out << "</body>"                                                  << std::endl;
01022   *out << "</html>"                                                  << std::endl;
01023 
01024 
01025 }
01026 void FUEventProcessor::scalersWeb(xgi::Input  *in, xgi::Output *out)
01027   throw (xgi::exception::Exception)
01028 {
01029 
01030   out->getHTTPResponseHeader().addHeader( "Content-Type",
01031                                           "application/octet-stream" );
01032   out->getHTTPResponseHeader().addHeader( "Content-Transfer-Encoding",
01033                                           "binary" );
01034   if(evtProcessor_ != 0){
01035     out->write( (char*)(evtProcessor_.getPackedTriggerReportAsStruct()), sizeof(TriggerReportStatic));
01036   }
01037 }
01038 
01039 void FUEventProcessor::pathNames(xgi::Input  *in, xgi::Output *out)
01040   throw (xgi::exception::Exception)
01041 {
01042 
01043   if(evtProcessor_ != 0){
01044     xdata::Serializable *legenda = scalersLegendaInfoSpace_->find("scalersLegenda");
01045     if(legenda !=0){
01046       std::string slegenda = ((xdata::String*)legenda)->value_;
01047       *out << slegenda << std::endl;
01048     }
01049   }
01050 }
01051 
01052 
01053 void FUEventProcessor::setAttachDqmToShm() throw (evf::Exception)  
01054 {
01055   std::string errmsg;
01056   try {
01057     edm::ServiceRegistry::Operate operate(evtProcessor_->getToken());
01058     if(edm::Service<FUShmDQMOutputService>().isAvailable())
01059       edm::Service<FUShmDQMOutputService>()->setAttachToShm();
01060   }
01061   catch (cms::Exception& e) {
01062     errmsg = "Failed to set to attach DQM service to shared memory: " + (std::string)e.what();
01063   }
01064   if (!errmsg.empty()) XCEPT_RAISE(evf::Exception,errmsg);
01065 }
01066 
01067 
01068 void FUEventProcessor::attachDqmToShm() throw (evf::Exception)  
01069 {
01070   std::string errmsg;
01071   bool success = false;
01072   try {
01073     edm::ServiceRegistry::Operate operate(evtProcessor_->getToken());
01074     if(edm::Service<FUShmDQMOutputService>().isAvailable())
01075       success = edm::Service<FUShmDQMOutputService>()->attachToShm();
01076     if (!success) errmsg = "Failed to attach DQM service to shared memory";
01077   }
01078   catch (cms::Exception& e) {
01079     errmsg = "Failed to attach DQM service to shared memory: " + (std::string)e.what();
01080   }
01081   if (!errmsg.empty()) XCEPT_RAISE(evf::Exception,errmsg);
01082 }
01083 
01084 
01085 
01086 void FUEventProcessor::detachDqmFromShm() throw (evf::Exception)
01087 {
01088   std::string errmsg;
01089   bool success = false;
01090   try {
01091     edm::ServiceRegistry::Operate operate(evtProcessor_->getToken());
01092     if(edm::Service<FUShmDQMOutputService>().isAvailable())
01093       success = edm::Service<FUShmDQMOutputService>()->detachFromShm();
01094     if (!success) errmsg = "Failed to detach DQM service from shared memory";
01095   }
01096   catch (cms::Exception& e) {
01097     errmsg = "Failed to detach DQM service from shared memory: " + (std::string)e.what();
01098   }
01099   if (!errmsg.empty()) XCEPT_RAISE(evf::Exception,errmsg);
01100 }
01101 
01102 
01103 std::string FUEventProcessor::logsAsString()
01104 {
01105   std::ostringstream oss;
01106   if(logWrap_)
01107     {
01108       for(unsigned int i = logRingIndex_; i < logRing_.size(); i++)
01109         oss << logRing_[i] << std::endl;
01110       for(unsigned int i = 0; i <  logRingIndex_; i++)
01111         oss << logRing_[i] << std::endl;
01112     }
01113   else
01114       for(unsigned int i = logRingIndex_; i < logRing_.size(); i++)
01115         oss << logRing_[i] << std::endl;
01116     
01117   return oss.str();
01118 }
01119   
01120 void FUEventProcessor::localLog(std::string m)
01121 {
01122   timeval tv;
01123 
01124   gettimeofday(&tv,0);
01125   tm *uptm = localtime(&tv.tv_sec);
01126   char datestring[256];
01127   strftime(datestring, sizeof(datestring),"%c", uptm);
01128 
01129   if(logRingIndex_ == 0){logWrap_ = true; logRingIndex_ = logRingSize_;}
01130   logRingIndex_--;
01131   std::ostringstream timestamp;
01132   timestamp << " at " << datestring;
01133   m += timestamp.str();
01134   logRing_[logRingIndex_] = m;
01135 }
01136 
01137 void FUEventProcessor::startSupervisorLoop()
01138 {
01139   try {
01140     wlSupervising_=
01141       toolbox::task::getWorkLoopFactory()->getWorkLoop("Supervisor",
01142                                                        "waiting");
01143     if (!wlSupervising_->isActive()) wlSupervising_->activate();
01144     asSupervisor_ = toolbox::task::bind(this,&FUEventProcessor::supervisor,
01145                                         "Supervisor");
01146     wlSupervising_->submit(asSupervisor_);
01147     supervising_ = true;
01148   }
01149   catch (xcept::Exception& e) {
01150     std::string msg = "Failed to start workloop 'Supervisor'.";
01151     XCEPT_RETHROW(evf::Exception,msg,e);
01152   }
01153 }
01154 
01155 void FUEventProcessor::startReceivingLoop()
01156 {
01157   try {
01158     wlReceiving_=
01159       toolbox::task::getWorkLoopFactory()->getWorkLoop("Receiving",
01160                                                        "waiting");
01161     if (!wlReceiving_->isActive()) wlReceiving_->activate();
01162     asReceiveMsgAndExecute_ = toolbox::task::bind(this,&FUEventProcessor::receiving,
01163                                         "Receiving");
01164     wlReceiving_->submit(asReceiveMsgAndExecute_);
01165     receiving_ = true;
01166   }
01167   catch (xcept::Exception& e) {
01168     std::string msg = "Failed to start workloop 'Receiving'.";
01169     XCEPT_RETHROW(evf::Exception,msg,e);
01170   }
01171 }
01172 void FUEventProcessor::startReceivingMonitorLoop()
01173 {
01174   try {
01175     wlReceivingMonitor_=
01176       toolbox::task::getWorkLoopFactory()->getWorkLoop("ReceivingM",
01177                                                        "waiting");
01178     if (!wlReceivingMonitor_->isActive()) 
01179       wlReceivingMonitor_->activate();
01180     asReceiveMsgAndRead_ = 
01181       toolbox::task::bind(this,&FUEventProcessor::receivingAndMonitor,
01182                           "ReceivingM");
01183     wlReceivingMonitor_->submit(asReceiveMsgAndRead_);
01184     receivingM_ = true;
01185   }
01186   catch (xcept::Exception& e) {
01187     std::string msg = "Failed to start workloop 'ReceivingM'.";
01188     XCEPT_RETHROW(evf::Exception,msg,e);
01189   }
01190 }
01191 
01192 bool FUEventProcessor::receiving(toolbox::task::WorkLoop *)
01193 {
01194   MsgBuf msg;
01195   try{
01196     myProcess_->rcvSlave(msg,false); //will receive only messages from Master
01197     if(msg->mtype==MSQM_MESSAGE_TYPE_RLI)
01198       {
01199         rlimit rl;
01200         getrlimit(RLIMIT_CORE,&rl);
01201         rl.rlim_cur=0;
01202         setrlimit(RLIMIT_CORE,&rl);
01203         rlimit_coresize_changed_=true;
01204       }
01205     if (msg->mtype==MSQM_MESSAGE_TYPE_RLR)
01206       {
01207         //reset coresize limit
01208         setrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
01209         rlimit_coresize_changed_=false;
01210       }
01211     if(msg->mtype==MSQM_MESSAGE_TYPE_STOP)
01212       {
01213         pthread_mutex_lock(&stop_lock_);
01214         try {
01215           fsm_.fireEvent("Stop",this); // need to set state in fsm first to allow stopDone transition
01216         }
01217         catch (...) {
01218           LOG4CPLUS_ERROR(getApplicationLogger(),"Failed to go to Stopping state in slave EP, pid "
01219                                                  << getpid() << " The state on Stop event was not consistent");
01220         }
01221 
01222         try {
01223           stopClassic(); // call the normal sequence of stopping - as this is allowed to fail provisions must be made ...@@@EM
01224         }
01225         catch (...) {
01226           LOG4CPLUS_ERROR(getApplicationLogger(),"Slave EP 'receiving' workloop: exception " << getpid());
01227         }
01228 
01229         //destroy MessageService thread before exit     
01230         try{
01231           messageServicePresence_.reset();
01232         }
01233         catch(...) {
01234           LOG4CPLUS_ERROR(getApplicationLogger(),"SLAVE:Unable to destroy MessageServicePresence. pid:" << getpid() );
01235         }
01236 
01237         MsgBuf msg1(0,MSQS_MESSAGE_TYPE_STOP);
01238         myProcess_->postSlave(msg1,false);
01239         pthread_mutex_unlock(&stop_lock_);
01240         fclose(stdout);
01241         fclose(stderr);
01242         _exit(EXIT_SUCCESS);
01243       }
01244     if(msg->mtype==MSQM_MESSAGE_TYPE_FSTOP)
01245       _exit(EXIT_SUCCESS);
01246   }
01247   catch(evf::Exception &e){
01248     LOG4CPLUS_ERROR(getApplicationLogger(),"Slave EP pid:" << getpid() << " receiving WorkLoop exception: "<<e.what());
01249   }
01250   return true;
01251 }
01252 
01253 bool FUEventProcessor::supervisor(toolbox::task::WorkLoop *)
01254 {
01255   pthread_mutex_lock(&stop_lock_);
01256   if(subs_.size()!=nbSubProcesses_.value_)
01257     {
01258       pthread_mutex_lock(&pickup_lock_);
01259       if(subs_.size()!=nbSubProcesses_.value_) {
01260         subs_.resize(nbSubProcesses_.value_);
01261         spMStates_.resize(nbSubProcesses_.value_);
01262         spmStates_.resize(nbSubProcesses_.value_);
01263         for(unsigned int i = 0; i < spMStates_.size(); i++)
01264           {
01265             spMStates_[i] = edm::event_processor::sInit; 
01266             spmStates_[i] = 0; 
01267           }
01268       }
01269       pthread_mutex_unlock(&pickup_lock_);
01270     }
01271   bool running = fsm_.stateName()->toString()=="Enabled";
01272   bool stopping = fsm_.stateName()->toString()=="stopping";
01273   for(unsigned int i = 0; i < subs_.size(); i++)
01274     {
01275       if(subs_[i].alive()==-1000) continue;
01276       int sl;
01277       pid_t sub_pid = subs_[i].pid();
01278       pid_t killedOrNot = waitpid(sub_pid,&sl,WNOHANG);
01279 
01280       if(killedOrNot && killedOrNot==sub_pid) {
01281         pthread_mutex_lock(&pickup_lock_);
01282         //check if out of range or recreated (enable can clear vector)
01283         if (i<subs_.size() && subs_[i].alive()!=-1000) {
01284           subs_[i].setStatus((WIFEXITED(sl) != 0 ? 0 : -1));
01285           std::ostringstream ost;
01286           if(subs_[i].alive()==0) ost << " process exited with status " << WEXITSTATUS(sl);
01287           else if(WIFSIGNALED(sl)!=0) {
01288             ost << " process terminated with signal " << WTERMSIG(sl);
01289           }
01290           else ost << " process stopped ";
01291           //report unexpected slave exit in stop
01292           //if (stopping && (WEXITSTATUS(sl)!=0 || WIFSIGNALED(sl)!=0)) {
01293           //  LOG4CPLUS_WARN(getApplicationLogger(),ost.str() << ", slave pid:"<<getpid());
01294           //}
01295           subs_[i].countdown()=slaveRestartDelaySecs_.value_;
01296           subs_[i].setReasonForFailed(ost.str());
01297           spMStates_[i] = evtProcessor_.notstarted_state_code();
01298           spmStates_[i] = 0;
01299           std::ostringstream ost1;
01300           ost1 << "-E- Slave " << subs_[i].pid() << ost.str();
01301           localLog(ost1.str());
01302           if(!autoRestartSlaves_.value_) subs_[i].disconnect();
01303         }
01304         pthread_mutex_unlock(&pickup_lock_);
01305       }
01306     }
01307   pthread_mutex_unlock(&stop_lock_);    
01308   if(stopping) return true; // if in stopping we are done
01309 
01310   // check if we need to reset core dumps (15 min after last one)
01311   if (running && rlimit_coresize_changed_) {
01312     timeval newtv;
01313     gettimeofday(&newtv,0);
01314     int delta = newtv.tv_sec-lastCrashTime_.tv_sec;
01315     if (delta>60*15) {
01316       std::ostringstream ostr;
01317       ostr << " No more slave EP crashes on this machine in last 15 min. resetting core size limits";
01318       std::cout << ostr.str() << std::endl;
01319       LOG4CPLUS_INFO(getApplicationLogger(),ostr.str());
01320       setrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
01321       MsgBuf master_message_rlr_(NUMERIC_MESSAGE_SIZE,MSQM_MESSAGE_TYPE_RLR);
01322       for (unsigned int i = 0; i < subs_.size(); i++) {
01323         try {
01324           if (subs_[i].alive())
01325             subs_[i].post(master_message_rlr_,false);
01326         }
01327         catch (...) {}
01328       }
01329       rlimit_coresize_changed_=false;
01330       crashesThisRun_=0;
01331     }
01332   }
01333 
01334   if(running && edm_init_done_)
01335     {
01336       // if enabled, this loop will periodically check if dead slaves countdown has expired and restart them
01337       // this is only active while running, hence, the stop lock is acquired and only released at end of loop
01338       if(autoRestartSlaves_.value_){
01339         pthread_mutex_lock(&stop_lock_); //lockout slave killing at stop while you check for restarts
01340         for(unsigned int i = 0; i < subs_.size(); i++)
01341           {
01342             if(subs_[i].alive() != 1){
01343               if(subs_[i].countdown() == 0)
01344                 {
01345                   if(subs_[i].restartCount()>2){
01346                     LOG4CPLUS_WARN(getApplicationLogger()," Not restarting subprocess in slot " << i 
01347                                    << " - maximum restart count reached");
01348                     std::ostringstream ost1;
01349                     ost1 << "-W- Dead Process in slot " << i << " reached maximum restart count"; 
01350                     localLog(ost1.str());
01351                     subs_[i].countdown()--;
01352                     XCEPT_DECLARE(evf::Exception,
01353                                   sentinelException, ost1.str());
01354                     notifyQualified("error",sentinelException);
01355                     subs_[i].disconnect();
01356                     continue;
01357                   }
01358                   subs_[i].restartCount()++;
01359                   if (forkInEDM_.value_) {
01360                     restartForkInEDM(i);
01361                   }
01362                   else {
01363                     pid_t rr = subs_[i].forkNew();
01364                     if(rr==0)
01365                     {
01366                       myProcess_=&subs_[i];
01367                       scalersUpdates_ = 0;
01368                       int retval = pthread_mutex_destroy(&stop_lock_);
01369                       if(retval != 0) perror("error");
01370                       retval = pthread_mutex_init(&stop_lock_,0);
01371                       if(retval != 0) perror("error");
01372                       fsm_.disableRcmsStateNotification();
01373                       fsm_.fireEvent("Stop",this); // need to set state in fsm first to allow stopDone transition
01374                       fsm_.fireEvent("StopDone",this); // need to set state in fsm first to allow stopDone transition
01375                       fsm_.fireEvent("Enable",this); // need to set state in fsm first to allow stopDone transition
01376                       try{
01377                         xdata::Serializable *lsid = applicationInfoSpace_->find("lumiSectionIndex");
01378                         if(lsid) {
01379                           ((xdata::UnsignedInteger32*)(lsid))->value_--; // need to reset to value before end of ls in which process died
01380                         }
01381                       }
01382                       catch(...){
01383                         std::cout << "trouble with lsindex during restart" << std::endl;
01384                       }
01385                       try{
01386                         xdata::Serializable *lstb = applicationInfoSpace_->find("lsToBeRecovered");
01387                         if(lstb) {
01388                           ((xdata::Boolean*)(lstb))->value_ = false; // do not issue eol/bol for all Ls when restarting
01389                         }
01390                       }
01391                       catch(...){
01392                         std::cout << "trouble with resetting flag for eol recovery " << std::endl;
01393                       }
01394 
01395                       evtProcessor_.adjustLsIndexForRestart();
01396                       evtProcessor_.resetPackedTriggerReport();
01397                       enableMPEPSlave();
01398                       return false; // exit the supervisor loop immediately in the child !!!
01399                     }
01400                   else
01401                     {
01402                       std::ostringstream ost1;
01403                       ost1 << "-I- New Process " << rr << " forked for slot " << i; 
01404                       localLog(ost1.str());
01405                     }
01406                   }
01407                 }
01408               if(subs_[i].countdown()>=0) subs_[i].countdown()--;
01409             }
01410           }
01411         pthread_mutex_unlock(&stop_lock_);
01412       } // finished handling replacement of dead slaves once they've been reaped
01413     }
01414   xdata::Serializable *lsid = 0; 
01415   xdata::Serializable *psid = 0;
01416   xdata::Serializable *dqmp = 0;
01417   xdata::UnsignedInteger32 *dqm = 0;
01418 
01419 
01420   
01421   if(running && edm_init_done_){  
01422     try{
01423       lsid = applicationInfoSpace_->find("lumiSectionIndex");
01424       psid = applicationInfoSpace_->find("prescaleSetIndex");
01425       nbProcessed = monitorInfoSpace_->find("nbProcessed");
01426       nbAccepted  = monitorInfoSpace_->find("nbAccepted");
01427       dqmp = applicationInfoSpace_-> find("nbDqmUpdates");      
01428     }
01429     catch(xdata::exception::Exception e){
01430       LOG4CPLUS_INFO(getApplicationLogger(),"could not retrieve some data - " << e.what());    
01431     }
01432 
01433     try{
01434       if(nbProcessed !=0 && nbAccepted !=0)
01435         {
01436           xdata::UnsignedInteger32*nbp = ((xdata::UnsignedInteger32*)nbProcessed);
01437           xdata::UnsignedInteger32*nba = ((xdata::UnsignedInteger32*)nbAccepted);
01438           xdata::UnsignedInteger32*ls  = ((xdata::UnsignedInteger32*)lsid);
01439           xdata::UnsignedInteger32*ps  = ((xdata::UnsignedInteger32*)psid);
01440           if(dqmp!=0)
01441             dqm = (xdata::UnsignedInteger32*)dqmp;
01442           if(dqm) dqm->value_ = 0;
01443           nbTotalDQM_ = 0;
01444           nbp->value_ = 0;
01445           nba->value_ = 0;
01446           nblive_ = 0;
01447           nbdead_ = 0;
01448           scalersUpdates_ = 0;
01449 
01450           for(unsigned int i = 0; i < subs_.size(); i++)
01451             {
01452               if(subs_[i].alive()>0)
01453                 {
01454                   nblive_++;
01455                   try{
01456                     subs_[i].post(master_message_prg_,true);
01457                     
01458                     unsigned long retval = subs_[i].rcvNonBlocking(master_message_prr_,true);
01459                     if(retval == (unsigned long) master_message_prr_->mtype){
01460                       prg* p = (struct prg*)(master_message_prr_->mtext);
01461                       subs_[i].setParams(p);
01462                       spMStates_[i] = p->Ms;
01463                       spmStates_[i] = p->ms;
01464                       cpustat_->addEntry(p->ms);
01465                       if(!subs_[i].inInconsistentState() && 
01466                          (p->Ms == edm::event_processor::sError 
01467                           || p->Ms == edm::event_processor::sInvalid
01468                           || p->Ms == edm::event_processor::sStopping))
01469                         {
01470                           std::ostringstream ost;
01471                           ost << "edm::eventprocessor slot " << i << " process id " 
01472                               << subs_[i].pid() << " not in Running state : Mstate=" 
01473                               << evtProcessor_.stateNameFromIndex(p->Ms) << " mstate="
01474                               << evtProcessor_.moduleNameFromIndex(p->ms) 
01475                               << " - Look into possible error messages from HLT process";
01476                           LOG4CPLUS_WARN(getApplicationLogger(),ost.str());
01477                         }
01478                       nbp->value_ += subs_[i].params().nbp;
01479                       nba->value_  += subs_[i].params().nba;
01480                       if(dqm)dqm->value_ += p->dqm;
01481                       nbTotalDQM_ +=  p->dqm;
01482                       scalersUpdates_ += p->trp;
01483                       if(p->ls > ls->value_) ls->value_ = p->ls;
01484                       if(p->ps != ps->value_) ps->value_ = p->ps;
01485                     }
01486                     else{
01487                       nbp->value_ += subs_[i].get_save_nbp();
01488                       nba->value_ += subs_[i].get_save_nba();
01489                     }
01490                   } 
01491                   catch(evf::Exception &e){
01492                     LOG4CPLUS_INFO(getApplicationLogger(),
01493                                    "could not send/receive msg on slot " 
01494                                    << i << " - " << e.what());    
01495                   }
01496                     
01497                 }
01498               else
01499                 {
01500                   nbp->value_ += subs_[i].get_save_nbp();
01501                   nba->value_ += subs_[i].get_save_nba();
01502                   nbdead_++;
01503                 }
01504             }
01505           if(nbp->value_>64){//have some slaves already processed more than one event ? (eventually make this == number of raw cells)
01506             for(unsigned int i = 0; i < subs_.size(); i++)
01507               {
01508                 if(subs_[i].params().nbp == 0){ // a slave has processed 0 events 
01509                   // check that the process is not stuck
01510                   if(subs_[i].alive()>0 && subs_[i].params().ms == 0) // the process is seen alive but in us=Invalid(0)
01511                     {
01512                       subs_[i].found_invalid();//increase the "found_invalid" counter
01513                       if(subs_[i].nfound_invalid() > 60){ //wait x monitor cycles (~1 min a good time ?) before doing something about a stuck slave
01514                         MsgBuf msg3(NUMERIC_MESSAGE_SIZE,MSQM_MESSAGE_TYPE_FSTOP);      // send a force-stop signal             
01515                         subs_[i].post(msg3,false);
01516                         std::ostringstream ost1;
01517                         ost1 << "-W- Process in slot " << i << " Never reached the running state - forcestopping it"; 
01518                         localLog(ost1.str());
01519                         LOG4CPLUS_ERROR(getApplicationLogger(),ost1.str());    
01520                         XCEPT_DECLARE(evf::Exception,
01521                                       sentinelException, ost1.str());
01522                         notifyQualified("error",sentinelException);
01523 
01524                       }
01525                     }
01526                 }
01527               }
01528           }
01529         }
01530     }
01531     catch(std::exception &e){
01532       LOG4CPLUS_INFO(getApplicationLogger(),"std exception - " << e.what());    
01533     }
01534     catch(...){
01535       LOG4CPLUS_INFO(getApplicationLogger(),"unknown exception ");    
01536     }
01537   }
01538   else{
01539     for(unsigned int i = 0; i < subs_.size(); i++)
01540       {
01541         if(subs_[i].alive()==-1000)
01542           {
01543             spMStates_[i] = edm::event_processor::sInit;
01544             spmStates_[i] = 0;
01545           }
01546       }
01547   }
01548   try{
01549     monitorInfoSpace_->lock();
01550     monitorInfoSpace_->fireItemGroupChanged(names_,0);
01551     monitorInfoSpace_->unlock();
01552   }
01553   catch(xdata::exception::Exception &e)
01554     {
01555       LOG4CPLUS_ERROR(log_, "Exception from fireItemGroupChanged: " << e.what());
01556       //        localLog(e.what());
01557     }
01558   ::sleep(superSleepSec_.value_);       
01559   return true;
01560 }
01561 
01562 void FUEventProcessor::startScalersWorkLoop() throw (evf::Exception)
01563 {
01564   try {
01565     wlScalers_=
01566       toolbox::task::getWorkLoopFactory()->getWorkLoop("Scalers",
01567                                                        "waiting");
01568     if (!wlScalers_->isActive()) wlScalers_->activate();
01569     asScalers_ = toolbox::task::bind(this,&FUEventProcessor::scalers,
01570                                      "Scalers");
01571     
01572   wlScalers_->submit(asScalers_);
01573   wlScalersActive_ = true;
01574   }
01575   catch (xcept::Exception& e) {
01576     std::string msg = "Failed to start workloop 'Scalers'.";
01577     XCEPT_RETHROW(evf::Exception,msg,e);
01578   }
01579 }
01580 
01581 //______________________________________________________________________________
01582 
01583 void FUEventProcessor::startSummarizeWorkLoop() throw (evf::Exception)
01584 {
01585   try {
01586     wlSummarize_=
01587       toolbox::task::getWorkLoopFactory()->getWorkLoop("Summary",
01588                                                        "waiting");
01589     if (!wlSummarize_->isActive()) wlSummarize_->activate();
01590     
01591     asSummarize_ = toolbox::task::bind(this,&FUEventProcessor::summarize,
01592                                        "Summary");
01593 
01594     wlSummarize_->submit(asSummarize_);
01595     wlSummarizeActive_ = true;
01596   }
01597   catch (xcept::Exception& e) {
01598     std::string msg = "Failed to start workloop 'Summarize'.";
01599     XCEPT_RETHROW(evf::Exception,msg,e);
01600   }
01601 }
01602 
01603 //______________________________________________________________________________
01604 
01605 bool FUEventProcessor::scalers(toolbox::task::WorkLoop* wl)
01606 {
01607   if(evtProcessor_)
01608     {
01609       if(!evtProcessor_.getTriggerReport(true)) {
01610         wlScalersActive_ = false;
01611         return false;
01612       }
01613     }
01614   else
01615     {
01616       std::cout << getpid()<< " Scalers workloop, bailing out, no evtProcessor " << std::endl;
01617       wlScalersActive_ = false;
01618       return false;
01619     }
01620   if(myProcess_) 
01621     {
01622       //      std::cout << getpid() << "going to post on control queue from scalers" << std::endl;
01623       int ret = myProcess_->postSlave(evtProcessor_.getPackedTriggerReport(),false);
01624       if(ret!=0)      std::cout << "scalers workloop, error posting to sqs_ " << errno << std::endl;
01625       scalersUpdates_++;
01626     }
01627   else
01628     evtProcessor_.fireScalersUpdate();
01629   return true;
01630 }
01631 
01632 //______________________________________________________________________________
01633 bool FUEventProcessor::summarize(toolbox::task::WorkLoop* wl)
01634 {
01635   evtProcessor_.resetPackedTriggerReport();
01636   bool atLeastOneProcessUpdatedSuccessfully = false;
01637   int msgCount = 0;
01638   for (unsigned int i = 0; i < subs_.size(); i++)
01639     {
01640       if(subs_[i].alive()>0)
01641         {
01642           int ret = 0;
01643           if(subs_[i].check_postponed_trigger_update(master_message_trr_,
01644                                                      evtProcessor_.getLumiSectionReferenceIndex()))
01645             {
01646               ret = MSQS_MESSAGE_TYPE_TRR;
01647               std::cout << "using postponed report from slot " << i << " for ls " << evtProcessor_.getLumiSectionReferenceIndex() << std::endl;
01648             }
01649           else{
01650             bool insync = false;
01651             bool exception_caught = false;
01652             while(!insync){
01653               try{
01654                 ret = subs_[i].rcv(master_message_trr_,false);
01655               }
01656               catch(evf::Exception &e)
01657                 {
01658                   std::cout << "exception in msgrcv on " << i 
01659                             << " " << subs_[i].alive() << " " << strerror(errno) << std::endl;
01660                   exception_caught = true;
01661                   break;
01662                   //do nothing special
01663                 }
01664               if(ret==MSQS_MESSAGE_TYPE_TRR) {
01665                 TriggerReportStatic *trp = (TriggerReportStatic *)master_message_trr_->mtext;
01666                 if(trp->lumiSection >= evtProcessor_.getLumiSectionReferenceIndex()){
01667                   insync = true;
01668                 }
01669               }
01670             }
01671             if(exception_caught) continue;
01672           }
01673           msgCount++;
01674           if(ret==MSQS_MESSAGE_TYPE_TRR) {
01675             TriggerReportStatic *trp = (TriggerReportStatic *)master_message_trr_->mtext;
01676             if(trp->lumiSection > evtProcessor_.getLumiSectionReferenceIndex()){
01677               std::cout << "postpone handling of msg from slot " << i << " with Ls " <<  trp->lumiSection
01678                         << " should be " << evtProcessor_.getLumiSectionReferenceIndex() << std::endl;
01679               subs_[i].add_postponed_trigger_update(master_message_trr_);
01680             }else{
01681               atLeastOneProcessUpdatedSuccessfully = true;
01682               evtProcessor_.sumAndPackTriggerReport(master_message_trr_);
01683             }
01684           }
01685           else std::cout << "msgrcv returned error " << errno << std::endl;
01686         }
01687     }
01688   if(atLeastOneProcessUpdatedSuccessfully){
01689     nbSubProcessesReporting_.value_ = msgCount;
01690     evtProcessor_.getPackedTriggerReportAsStruct()->nbExpected = nbSubProcesses_.value_;
01691     evtProcessor_.getPackedTriggerReportAsStruct()->nbReporting = nbSubProcessesReporting_.value_;
01692     evtProcessor_.updateRollingReport();
01693     evtProcessor_.fireScalersUpdate();
01694   }
01695   else{
01696     LOG4CPLUS_WARN(getApplicationLogger(),"Summarize loop: no process updated successfully - sleep 10 seconds before trying again");          
01697     if(msgCount==0) evtProcessor_.withdrawLumiSectionIncrement();
01698     nbSubProcessesReporting_.value_ = 0;
01699     ::sleep(10);
01700   }
01701   if(fsm_.stateName()->toString()!="Enabled"){
01702     wlScalersActive_ = false;
01703     return false;
01704   }
01705   //  cpustat_->printStat();
01706   if(iDieStatisticsGathering_.value_){
01707     try{
01708       unsigned long long idleTmp=idleProcStats_;
01709       unsigned long long allPSTmp=allProcStats_;
01710       idleProcStats_=allProcStats_=0;
01711 
01712       utils::procCpuStat(idleProcStats_,allProcStats_);
01713       timeval oldtime=lastProcReport_;
01714       gettimeofday(&lastProcReport_,0);
01715 
01716       if (allPSTmp!=0 && idleTmp!=0 && allProcStats_!=allPSTmp) {
01717         cpustat_->setCPUStat(1000 - ((idleProcStats_-idleTmp)*1000)/(allProcStats_-allPSTmp));
01718         int deltaTms=1000 * (lastProcReport_.tv_sec-oldtime.tv_sec)
01719                     + (lastProcReport_.tv_usec-oldtime.tv_usec)/1000;
01720         cpustat_->setElapsed(deltaTms);
01721       }
01722       else {
01723         cpustat_->setCPUStat(0);
01724         cpustat_->setElapsed(0);
01725       }
01726 
01727       TriggerReportStatic *trsp = evtProcessor_.getPackedTriggerReportAsStruct();
01728       cpustat_ ->setNproc(trsp->eventSummary.totalEvents);
01729       cpustat_ ->sendStat(evtProcessor_.getLumiSectionReferenceIndex());
01730       ratestat_->sendStat((unsigned char*)trsp,
01731                           sizeof(TriggerReportStatic),
01732                           evtProcessor_.getLumiSectionReferenceIndex());
01733     }catch(evf::Exception &e){
01734       LOG4CPLUS_INFO(getApplicationLogger(),"coud not send statistics"
01735                      << e.what());
01736     }
01737   }
01738   cpustat_->reset();
01739   return true;
01740 }
01741 
01742 
01743 
01744 bool FUEventProcessor::receivingAndMonitor(toolbox::task::WorkLoop *)
01745 {
01746   try{
01747     myProcess_->rcvSlave(slave_message_monitoring_,true); //will receive only messages from Master
01748     switch(slave_message_monitoring_->mtype)
01749       {
01750       case MSQM_MESSAGE_TYPE_MCS:
01751         {
01752           xgi::Input *in = 0;
01753           xgi::Output out;
01754           evtProcessor_.microState(in,&out);
01755           MsgBuf msg1(out.str().size(),MSQS_MESSAGE_TYPE_MCR);
01756           strncpy(msg1->mtext,out.str().c_str(),out.str().size());
01757           myProcess_->postSlave(msg1,true);
01758           break;
01759         }
01760       
01761       case MSQM_MESSAGE_TYPE_PRG:
01762         {
01763           xdata::Serializable *dqmp = 0;
01764           xdata::UnsignedInteger32 *dqm = 0;
01765           evtProcessor_.monitoring(0);
01766           try{
01767             dqmp = applicationInfoSpace_-> find("nbDqmUpdates");
01768           }  catch(xdata::exception::Exception e){}
01769           if(dqmp!=0)
01770             dqm = (xdata::UnsignedInteger32*)dqmp;
01771 
01772           //      monitorInfoSpace_->lock();  
01773           prg * data           = (prg*)slave_message_prr_->mtext;
01774           data->ls             = evtProcessor_.lsid_;
01775           data->eols           = evtProcessor_.lastLumiUsingEol_;
01776           data->ps             = evtProcessor_.psid_;
01777           data->nbp            = evtProcessor_->totalEvents();
01778           data->nba            = evtProcessor_->totalEventsPassed();
01779           data->Ms             = evtProcessor_.epMAltState_.value_;
01780           data->ms             = evtProcessor_.epmAltState_.value_;
01781           if(dqm) data->dqm    = dqm->value_; else data->dqm = 0;
01782           data->trp            = scalersUpdates_;
01783           //      monitorInfoSpace_->unlock();  
01784           myProcess_->postSlave(slave_message_prr_,true);
01785           if(exitOnError_.value_)
01786           { 
01787             // after each monitoring cycle check if we are in inconsistent state and exit if configured to do so  
01788             //      std::cout << getpid() << "receivingAndMonitor: trying to acquire stop lock " << std::endl;
01789             if(data->Ms == edm::event_processor::sError) 
01790               { 
01791                 bool running = true;
01792                 int count = 0;
01793                 while(running){
01794                   int retval = pthread_mutex_lock(&stop_lock_);
01795                   if(retval != 0) perror("error");
01796                   running = fsm_.stateName()->toString()=="Enabled";
01797                   if(count>5) _exit(-1);
01798                   pthread_mutex_unlock(&stop_lock_);
01799                   if(running) {::sleep(1); count++;}
01800                 }
01801               }
01802           }
01803           break;
01804         }
01805       case MSQM_MESSAGE_TYPE_WEB:
01806         {
01807           xgi::Input  *in = 0;
01808           xgi::Output out;
01809           unsigned int bytesToSend = 0;
01810           MsgBuf msg1(NUMERIC_MESSAGE_SIZE,MSQS_MESSAGE_TYPE_WEB);
01811           std::string query = slave_message_monitoring_->mtext;
01812           size_t pos = query.find_first_of("&");
01813           std::string method;
01814           std::string args;
01815           if(pos!=std::string::npos)  
01816             {
01817               method = query.substr(0,pos);
01818               args = query.substr(pos+1,query.length()-pos-1);
01819             }
01820           else
01821             method=query;
01822 
01823           if(method=="Spotlight")
01824             {
01825               spotlightWebPage(in,&out);
01826             }
01827           else if(method=="procStat")
01828             {
01829               procStat(in,&out);
01830             }
01831           else if(method=="moduleWeb")
01832             {
01833               internal::MyCgi mycgi;
01834               boost::char_separator<char> sep(";");
01835               boost::tokenizer<boost::char_separator<char> > tokens(args, sep);
01836               for (boost::tokenizer<boost::char_separator<char> >::iterator tok_iter = tokens.begin();
01837                    tok_iter != tokens.end(); ++tok_iter){
01838                 size_t pos = (*tok_iter).find_first_of("%");
01839                 if(pos != std::string::npos){
01840                   std::string first  = (*tok_iter).substr(0    ,                        pos);
01841                   std::string second = (*tok_iter).substr(pos+1, (*tok_iter).length()-pos-1);
01842                   mycgi.getEnvironment()[first]=second;
01843                 }
01844               }
01845               moduleWeb(&mycgi,&out);
01846             }
01847           else if(method=="Default")
01848             {
01849               defaultWebPage(in,&out);
01850             }
01851           else 
01852             {
01853               out << "Error 404!!!!!!!!" << std::endl;
01854             }
01855 
01856 
01857           bytesToSend = out.str().size();
01858           unsigned int cycle = 0;
01859           if(bytesToSend==0)
01860             {
01861               snprintf(msg1->mtext, NUMERIC_MESSAGE_SIZE, "%d", bytesToSend);
01862               myProcess_->postSlave(msg1,true);
01863             }
01864           while(bytesToSend !=0){
01865             unsigned int msgSize = bytesToSend>MAX_PIPE_BUFFER_SIZE ? MAX_PIPE_BUFFER_SIZE : bytesToSend;
01866             write(anonymousPipe_[PIPE_WRITE],
01867                   out.str().c_str()+MAX_PIPE_BUFFER_SIZE*cycle,
01868                   msgSize);
01869             snprintf(msg1->mtext, NUMERIC_MESSAGE_SIZE, "%d", msgSize);
01870             myProcess_->postSlave(msg1,true);
01871             bytesToSend -= msgSize;
01872             cycle++;
01873           }
01874           break;
01875         }
01876       case MSQM_MESSAGE_TYPE_TRP:
01877         {
01878           break;
01879         }
01880       }
01881   }
01882   catch(evf::Exception &e){std::cout << "exception caught in recevingM: " << e.what() << std::endl;}
01883   return true;
01884 }
01885 
01886 void FUEventProcessor::startSignalMonitorWorkLoop() throw (evf::Exception)
01887 {
01888   //todo rewind/check semaphore
01889   //start workloop
01890   try {
01891     wlSignalMonitor_=
01892       toolbox::task::getWorkLoopFactory()->getWorkLoop("SignalMonitor",
01893                                                        "waiting");
01894 
01895     if (!wlSignalMonitor_->isActive()) wlSignalMonitor_->activate();
01896     asSignalMonitor_ = toolbox::task::bind(this,&FUEventProcessor::sigmon,
01897                                        "SignalMonitor");
01898     wlSignalMonitor_->submit(asSignalMonitor_);
01899     signalMonitorActive_ = true;
01900   }
01901   catch (xcept::Exception& e) {
01902     std::string msg = "Failed to start workloop 'SignalMonitor'. (3)";
01903     std::cout << e.what() << std::endl;
01904     XCEPT_RETHROW(evf::Exception,msg,e);
01905   }
01906 }
01907  
01908 
01909 bool FUEventProcessor::sigmon(toolbox::task::WorkLoop* wl)
01910 {
01911   while (1) {
01912     sem_wait(sigmon_sem_);
01913     std::cout << " received signal notification from slave!"<< std::endl;
01914   
01915     //check if shutdown time
01916     bool running = fsm_.stateName()->toString()=="Enabled";
01917     bool stopping = fsm_.stateName()->toString()=="stopping";
01918     bool enabling = fsm_.stateName()->toString()=="enabling";
01919     if (!running && !enabling) {
01920       signalMonitorActive_ = false;
01921       return false;
01922     }
01923 
01924     crashesThisRun_++;
01925     gettimeofday(&lastCrashTime_,0);
01926 
01927     //set core size limit to 0 in master and slaves
01928     if (crashesThisRun_>=crashesToDump_.value_ && (running || stopping) && !rlimit_coresize_changed_) {
01929 
01930       rlimit rlold;
01931       getrlimit(RLIMIT_CORE,&rlold);
01932       rlimit rlnew = rlold;
01933       rlnew.rlim_cur=0;
01934       setrlimit(RLIMIT_CORE,&rlnew);
01935       rlimit_coresize_changed_=true;
01936       MsgBuf master_message_rli_(NUMERIC_MESSAGE_SIZE,MSQM_MESSAGE_TYPE_RLI);
01937       //in case of frequent crashes, allow first slot to dump (until restart)
01938       unsigned int min=1;
01939       for (unsigned int i = min; i < subs_.size(); i++) {
01940         try {
01941           if (subs_[i].alive()) {
01942             subs_[i].post(master_message_rli_,false);
01943           }
01944         }
01945         catch (...) {}
01946       }
01947       std::ostringstream ostr;
01948       ostr << "Number of recent slave crashes reaches " << crashesThisRun_
01949            << ". Disabling core dumps for next 15 minutes in this FilterUnit";
01950       LOG4CPLUS_WARN(getApplicationLogger(),ostr.str());
01951     }
01952   }//end while loop
01953   signalMonitorActive_ = false;
01954   return false;
01955 }
01956 
01957 
01958 bool FUEventProcessor::enableCommon()
01959 {
01960   try {    
01961     if(hasShMem_) attachDqmToShm();
01962     int sc = 0;
01963     evtProcessor_->clearCounters();
01964     if(isRunNumberSetter_)
01965       evtProcessor_->setRunNumber(runNumber_.value_);
01966     else
01967       evtProcessor_->declareRunNumber(runNumber_.value_);
01968 
01969     try{
01970       ::sleep(1);
01971       evtProcessor_->runAsync();
01972       sc = evtProcessor_->statusAsync();
01973     }
01974     catch(cms::Exception &e) {
01975       reasonForFailedState_ = e.explainSelf();
01976       fsm_.fireFailed(reasonForFailedState_,this);
01977       localLog(reasonForFailedState_);
01978       return false;
01979     }    
01980     catch(std::exception &e) {
01981       reasonForFailedState_  = e.what();
01982       fsm_.fireFailed(reasonForFailedState_,this);
01983       localLog(reasonForFailedState_);
01984       return false;
01985     }
01986     catch(...) {
01987       reasonForFailedState_ = "Unknown Exception";
01988       fsm_.fireFailed(reasonForFailedState_,this);
01989       localLog(reasonForFailedState_);
01990       return false;
01991     }
01992     if(sc != 0) {
01993       std::ostringstream oss;
01994       oss<<"EventProcessor::runAsync returned status code " << sc;
01995       reasonForFailedState_ = oss.str();
01996       fsm_.fireFailed(reasonForFailedState_,this);
01997       localLog(reasonForFailedState_);
01998       return false;
01999     }
02000   }
02001   catch (xcept::Exception &e) {
02002     reasonForFailedState_ = "enabling FAILED: " + (std::string)e.what();
02003     fsm_.fireFailed(reasonForFailedState_,this);
02004     localLog(reasonForFailedState_);
02005     return false;
02006   }
02007   try{
02008     fsm_.fireEvent("EnableDone",this);
02009   }
02010   catch (xcept::Exception &e) {
02011     std::cout << "exception " << (std::string)e.what() << std::endl;
02012     throw;
02013   }
02014 
02015   return false;
02016 }
02017 
02018 void FUEventProcessor::forkProcessFromEDM_helper(void * addr) {
02019   ((FUEventProcessor*)addr)->forkProcessesFromEDM();
02020 }
02021 
02022 void FUEventProcessor::forkProcessesFromEDM() {
02023 
02024   moduleweb::ForkParams * forkParams = &(forkInfoObj_->forkParams);
02025   unsigned int forkFrom=0;
02026   unsigned int forkTo=nbSubProcesses_.value_;
02027   if (forkParams->slotId>=0) {
02028     forkFrom=forkParams->slotId;
02029     forkTo=forkParams->slotId+1;
02030   }
02031 
02032   //before fork, make sure to disconnect output modules from Shm
02033   try {
02034     if (sorRef_) {
02035       std::vector<edm::FUShmOutputModule *> shmOutputs = sorRef_->getShmOutputModules();
02036       for (unsigned int i=0;i<shmOutputs.size();i++) {
02037         //unregister PID from ShmBuffer/RB
02038         shmOutputs[i]->unregisterFromShm();
02039         //disconnect from Shm
02040         shmOutputs[i]->stop();
02041       }
02042     }
02043   }
02044   catch (std::exception &e)
02045   {
02046     reasonForFailedState_ =  (std::string)"Thrown exception while disconnecting ShmOutputModule from Shm: " + e.what();
02047     LOG4CPLUS_ERROR(getApplicationLogger(),reasonForFailedState_);
02048     fsm_.fireFailed(reasonForFailedState_,this);
02049     localLog(reasonForFailedState_);
02050   }
02051   catch (...) {
02052     reasonForFailedState_ =  "Thrown unknown exception while disconnecting ShmOutputModule from Shm: ";
02053     LOG4CPLUS_ERROR(getApplicationLogger(),reasonForFailedState_);
02054     fsm_.fireFailed(reasonForFailedState_,this);
02055     localLog(reasonForFailedState_);
02056   }
02057 
02058   std::string currentState = fsm_.stateName()->toString();
02059 
02060   //destroy MessageServicePresence thread before fork
02061   if (currentState!="stopping") {
02062     try {
02063       messageServicePresence_.reset();
02064     }
02065     catch (...) {
02066       LOG4CPLUS_ERROR(getApplicationLogger(),"Unable to destroy MessageService thread before fork!");
02067     }
02068   }
02069 
02070   if (currentState=="stopping") {
02071     LOG4CPLUS_ERROR(getApplicationLogger(),"Can not fork subprocesses in state " << fsm_.stateName()->toString());
02072     forkParams->isMaster=1;
02073     forkInfoObj_->forkParams.slotId=-1;
02074     forkInfoObj_->forkParams.restart=0;
02075   }
02076   //fork loop
02077   else for(unsigned int i=forkFrom; i<forkTo; i++)
02078   {
02079     int retval = subs_[i].forkNew();
02080     if(retval==0)
02081     {
02082       forkParams->isMaster=0;
02083       myProcess_ = &subs_[i];
02084       // dirty hack: delete/recreate global binary semaphore for later use in child
02085       delete toolbox::mem::_s_mutex_ptr_;
02086       toolbox::mem::_s_mutex_ptr_ = new toolbox::BSem(toolbox::BSem::FULL,true);
02087       int retval = pthread_mutex_destroy(&stop_lock_);
02088       if(retval != 0) perror("error");
02089       retval = pthread_mutex_init(&stop_lock_,0);
02090       if(retval != 0) perror("error");
02091       fsm_.disableRcmsStateNotification();
02092 
02093       //recreate MessageLogger thread in slave after fork
02094       try{
02095         edm::PresenceFactory *pf = edm::PresenceFactory::get();
02096         if(pf != 0) {
02097           messageServicePresence_ = pf->makePresence("MessageServicePresence");
02098         }
02099         else {
02100           LOG4CPLUS_ERROR(getApplicationLogger(),
02101               "SLAVE: Unable to create message service presence. pid:"<<getpid());
02102         }
02103       }
02104       catch(...) {
02105         LOG4CPLUS_ERROR(getApplicationLogger(),"SLAVE: Unknown Exception in MessageServicePresence. pid:"<<getpid());
02106       }
02107 
02108       ML::MLlog4cplus::setAppl(this);
02109 
02110       //reconnect to Shm from output modules
02111       try {
02112         if (sorRef_) {
02113           std::vector<edm::FUShmOutputModule *> shmOutputs = sorRef_->getShmOutputModules();
02114           for (unsigned int i=0;i<shmOutputs.size();i++)
02115             shmOutputs[i]->start();
02116         }
02117       }
02118       catch (...)
02119       {
02120         LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception (ShmOutputModule sending InitMsg (pid:"<<getpid() <<")");
02121       }
02122 
02123       if (forkParams->restart) {
02124         //do restart things
02125         scalersUpdates_ = 0;
02126         try {
02127           fsm_.fireEvent("Stop",this); // need to set state in fsm first to allow stopDone transition
02128           fsm_.fireEvent("StopDone",this); // need to set state in fsm first to allow stopDone transition
02129           fsm_.fireEvent("Enable",this); // need to set state in fsm first to allow stopDone transition
02130         } catch (...) {
02131           LOG4CPLUS_WARN(getApplicationLogger(),"Failed to Stop/Enable FSM of the restarted slave EP");
02132         }
02133         try{
02134           xdata::Serializable *lsid = applicationInfoSpace_->find("lumiSectionIndex");
02135           if(lsid) {
02136             ((xdata::UnsignedInteger32*)(lsid))->value_--; // need to reset to value before end of ls in which process died
02137           }
02138         }
02139         catch(...){
02140           std::cout << "trouble with lsindex during restart" << std::endl;
02141         }
02142         try{
02143           xdata::Serializable *lstb = applicationInfoSpace_->find("lsToBeRecovered");
02144           if(lstb) {
02145             ((xdata::Boolean*)(lstb))->value_ = false; // do not issue eol/bol for all Ls when restarting
02146           }
02147         }
02148         catch(...){
02149           std::cout << "trouble with resetting flag for eol recovery " << std::endl;
02150         }
02151         evtProcessor_.adjustLsIndexForRestart();
02152         evtProcessor_.resetPackedTriggerReport();
02153       }
02154 
02155       //start other threads
02156       startReceivingLoop();
02157       startReceivingMonitorLoop();
02158       startScalersWorkLoop();
02159       while(!evtProcessor_.isWaitingForLs())
02160         ::usleep(100000);//wait for scalers loop to start
02161 
02162       //connect DQMShmOutputModule
02163       if(hasShMem_) attachDqmToShm();
02164 
02165       //catch transition error if we are already Enabled
02166       try {
02167         fsm_.fireEvent("EnableDone",this);
02168       }
02169       catch (...) {}
02170 
02171       //make sure workloops are started
02172       while (!wlReceiving_->isActive() || !wlReceivingMonitor_->isActive()) usleep(10000);
02173 
02174       //unmask signals
02175       sigset_t tmpset_thread;
02176       sigemptyset(&tmpset_thread);
02177       sigaddset(&tmpset_thread, SIGQUIT);
02178       sigaddset(&tmpset_thread, SIGILL);
02179       sigaddset(&tmpset_thread, SIGABRT);
02180       sigaddset(&tmpset_thread, SIGFPE);
02181       sigaddset(&tmpset_thread, SIGSEGV);
02182       sigaddset(&tmpset_thread, SIGALRM);
02183       //sigprocmask(SIG_UNBLOCK, &tmpset_thread, 0);
02184       pthread_sigmask(SIG_UNBLOCK,&tmpset_thread,0);
02185      
02186       //set signal handlers 
02187       struct sigaction sa;
02188       sigset_t tmpset;
02189       memset(&tmpset,0,sizeof(tmpset));
02190       sigemptyset(&tmpset);
02191       sa.sa_mask=tmpset;
02192       sa.sa_flags=SA_RESETHAND | SA_SIGINFO;
02193       sa.sa_handler=0;
02194       sa.sa_sigaction=evfep_sighandler;
02195 
02196       sigaction(SIGQUIT,&sa,0);
02197       sigaction(SIGILL,&sa,0);
02198       sigaction(SIGABRT,&sa,0);
02199       sigaction(SIGFPE,&sa,0);
02200       sigaction(SIGSEGV,&sa,0);
02201       sa.sa_sigaction=evfep_alarmhandler;
02202       sigaction(SIGALRM,&sa,0);
02203 
02204       //child return to DaqSource
02205       return ;
02206     }
02207     else {
02208 
02209       forkParams->isMaster=1;
02210       forkInfoObj_->forkParams.slotId=-1;
02211       if (forkParams->restart) {
02212         std::ostringstream ost1;
02213         ost1 << "-I- New Process " << retval << " forked for slot " << forkParams->slotId;
02214         localLog(ost1.str());
02215       }
02216       forkInfoObj_->forkParams.restart=0;
02217       //start "crash" receiver workloop
02218     }
02219   }
02220 
02221   //recreate MessageLogger thread after fork
02222   try{
02223     //release the presense factory in master
02224     edm::PresenceFactory *pf = edm::PresenceFactory::get();
02225     if(pf != 0) {
02226       messageServicePresence_ = pf->makePresence("MessageServicePresence");
02227     }
02228     else {
02229       LOG4CPLUS_ERROR(getApplicationLogger(),
02230           "Unable to recreate message service presence ");
02231     }
02232   }
02233   catch(...) {
02234     LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception in MessageServicePresence");
02235   }
02236   restart_in_progress_=false;
02237   edm_init_done_=true;
02238 }
02239 
02240 bool FUEventProcessor::enableForkInEDM() 
02241 {
02242   evtProcessor_.resetWaiting();
02243   try {
02244     //set to connect to Shm later
02245     //if(hasShMem_) setAttachDqmToShm();
02246 
02247     int sc = 0;
02248     //maybe not needed in MP mode
02249     evtProcessor_->clearCounters();
02250     if(isRunNumberSetter_)
02251       evtProcessor_->setRunNumber(runNumber_.value_);
02252     else
02253       evtProcessor_->declareRunNumber(runNumber_.value_);
02254 
02255     //prepare object used to communicate with DaqSource
02256     pthread_mutex_destroy(&forkObjLock_);
02257     pthread_mutex_init(&forkObjLock_,0);
02258     if (forkInfoObj_) delete forkInfoObj_;
02259     forkInfoObj_ = new moduleweb::ForkInfoObj();
02260     forkInfoObj_->mst_lock_=&forkObjLock_;
02261     forkInfoObj_->fuAddr=(void*)this;
02262     forkInfoObj_->forkHandler = forkProcessFromEDM_helper;
02263     forkInfoObj_->forkParams.slotId=-1;
02264     forkInfoObj_->forkParams.restart=0;
02265     forkInfoObj_->forkParams.isMaster=-1;
02266     forkInfoObj_->stopCondition=0;
02267     if (mwrRef_)
02268       mwrRef_->publishForkInfo(std::string("DaqSource"),forkInfoObj_);
02269 
02270     evtProcessor_->runAsync();
02271     sc = evtProcessor_->statusAsync();
02272 
02273     if(sc != 0) {
02274       std::ostringstream oss;
02275       oss<<"EventProcessor::runAsync returned status code " << sc;
02276       reasonForFailedState_ = oss.str();
02277       fsm_.fireFailed(reasonForFailedState_,this);
02278       LOG4CPLUS_FATAL(log_,reasonForFailedState_);
02279       return false;
02280     }
02281   }
02282   //catch exceptions on master side
02283   catch(cms::Exception &e) {
02284     reasonForFailedState_ = e.explainSelf();
02285     fsm_.fireFailed(reasonForFailedState_,this);
02286     LOG4CPLUS_FATAL(log_,reasonForFailedState_);
02287     return false;
02288   }    
02289   catch(std::exception &e) {
02290     reasonForFailedState_  = e.what();
02291     fsm_.fireFailed(reasonForFailedState_,this);
02292     LOG4CPLUS_FATAL(log_,reasonForFailedState_);
02293     return false;
02294   }
02295   catch(...) {
02296     reasonForFailedState_ = "Unknown Exception";
02297     fsm_.fireFailed(reasonForFailedState_,this);
02298     LOG4CPLUS_FATAL(log_,reasonForFailedState_);
02299     return false;
02300   }
02301   return true;
02302 }
02303 
02304 bool FUEventProcessor::restartForkInEDM(unsigned int slotId) {
02305   //daqsource will keep this lock until master returns after fork
02306   //so that we don't do another EP restart in between
02307   forkInfoObj_->lock();
02308   forkInfoObj_->forkParams.slotId=slotId;
02309   forkInfoObj_->forkParams.restart=true;
02310   forkInfoObj_->forkParams.isMaster=1;
02311   forkInfoObj_->stopCondition=0;
02312   LOG4CPLUS_DEBUG(log_, " restarting subprocess in slot "<< slotId <<": posting on control semaphore");
02313   sem_post(forkInfoObj_->control_sem_);
02314   forkInfoObj_->unlock();
02315   usleep(1000);
02316   //sleep until fork is performed
02317   int count=50;
02318   restart_in_progress_=true;
02319   while (restart_in_progress_ && count--) usleep(20000);
02320   return true;
02321 }
02322 
02323 bool FUEventProcessor::enableClassic()
02324 {
02325   bool retval = enableCommon();
02326   while(evtProcessor_->getState()!= edm::event_processor::sRunning){
02327     LOG4CPLUS_INFO(getApplicationLogger(),"waiting for edm::EventProcessor to start before enabling watchdog");
02328     ::sleep(1);
02329   }
02330   
02331   //  implementation moved to EPWrapper
02332   //  startScalersWorkLoop(); // this is now not done any longer 
02333   localLog("-I- Start completed");
02334   return retval;
02335 }
02336 bool FUEventProcessor::enableMPEPSlave()
02337 {
02338   //all this happens only in the child process
02339 
02340   startReceivingLoop();
02341   startReceivingMonitorLoop();
02342   evtProcessor_.resetWaiting();
02343   startScalersWorkLoop();
02344   while(!evtProcessor_.isWaitingForLs())
02345     ::sleep(1);
02346 
02347   // @EM test do not run monitor loop in slave, only receiving&Monitor
02348   //  evtProcessor_.startMonitoringWorkLoop();
02349   try{
02350     //    evtProcessor_.makeServicesOnly();
02351     try{
02352       edm::PresenceFactory *pf = edm::PresenceFactory::get();
02353       if(pf != 0) {
02354         pf->makePresence("MessageServicePresence").release();
02355       }
02356       else {
02357         LOG4CPLUS_ERROR(getApplicationLogger(),
02358                         "Unable to create message service presence ");
02359       }
02360     } 
02361     catch(...) {
02362       LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception");
02363     }
02364   ML::MLlog4cplus::setAppl(this);
02365   }       
02366   catch (xcept::Exception &e) {
02367     reasonForFailedState_ = "enabling FAILED: " + (std::string)e.what();
02368     fsm_.fireFailed(reasonForFailedState_,this);
02369     localLog(reasonForFailedState_);
02370   }
02371   bool retval =  enableCommon();
02372   //  while(evtProcessor_->getState()!= edm::event_processor::sRunning){
02373   //    LOG4CPLUS_INFO(getApplicationLogger(),"waiting for edm::EventProcessor to start before enabling watchdog");
02374   //    ::sleep(1);
02375   //  }
02376   return retval;
02377 }
02378 
02379 bool FUEventProcessor::stopClassic()
02380 {
02381   bool failed=false;
02382   try {
02383     LOG4CPLUS_INFO(getApplicationLogger(),"Start stopping :) ...");
02384     edm::EventProcessor::StatusCode rc = evtProcessor_.stop();
02385     if(rc == edm::EventProcessor::epSuccess) 
02386       fsm_.fireEvent("StopDone",this);
02387     else
02388       {
02389         failed=true;
02390         //      epMState_ = evtProcessor_->currentStateName();
02391         if(rc == edm::EventProcessor::epTimedOut)
02392           reasonForFailedState_ = "EventProcessor stop timed out";
02393         else
02394           reasonForFailedState_ = "EventProcessor did not receive STOP event";
02395       }
02396   }
02397   catch (xcept::Exception &e) {
02398     failed=true;
02399     reasonForFailedState_ = "Stopping FAILED: " + (std::string)e.what();
02400   }
02401   catch (edm::Exception &e) {
02402     failed=true;
02403     reasonForFailedState_ = "Stopping FAILED: " + (std::string)e.what();
02404   }
02405   catch (...) {
02406     failed=true;
02407     reasonForFailedState_= "Stopping FAILED: unknown exception";
02408   }
02409   try {
02410     if (hasShMem_) {
02411       detachDqmFromShm();
02412       if (failed) 
02413         LOG4CPLUS_WARN(getApplicationLogger(), 
02414             "In failed STOP - success detaching DQM from Shm. pid:" << getpid());
02415     }
02416   }
02417   catch (cms::Exception & e) {
02418     failed=true;
02419     reasonForFailedState_= "Stopping FAILED: " + (std::string)e.what();
02420   }
02421   catch (...) {
02422     failed=true;
02423     reasonForFailedState_= "DQM detach failed: Unknown exception";
02424   }
02425 
02426   if (failed) {
02427     LOG4CPLUS_FATAL(getApplicationLogger(),"STOP failed: "
02428         << reasonForFailedState_ << " (pid:" << getpid()<<")");
02429     localLog(reasonForFailedState_);
02430     fsm_.fireFailed(reasonForFailedState_,this);
02431   }
02432 
02433   LOG4CPLUS_INFO(getApplicationLogger(),"Finished stopping!");
02434   localLog("-I- Stop completed");
02435   return false;
02436 }
02437 
02438 void FUEventProcessor::stopSlavesAndAcknowledge()
02439 {
02440   MsgBuf msg(0,MSQM_MESSAGE_TYPE_STOP);
02441   MsgBuf msg1(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_STOP);
02442 
02443   std::vector<bool> processes_to_stop(nbSubProcesses_.value_,false);
02444   for(unsigned int i = 0; i < subs_.size(); i++)
02445     {
02446       pthread_mutex_lock(&stop_lock_);
02447       if(subs_[i].alive()>0){
02448         processes_to_stop[i] = true;
02449         subs_[i].post(msg,false);
02450       }
02451       pthread_mutex_unlock(&stop_lock_);
02452     }
02453   for(unsigned int i = 0; i < subs_.size(); i++)
02454     {
02455       pthread_mutex_lock(&stop_lock_);
02456       if(processes_to_stop[i]){
02457         try{
02458           subs_[i].rcv(msg1,false);
02459         }
02460         catch(evf::Exception &e){
02461           std::ostringstream ost;
02462           ost << "failed to get STOP - errno ->" << errno << " " << e.what(); 
02463           reasonForFailedState_ = ost.str();
02464           LOG4CPLUS_ERROR(getApplicationLogger(),reasonForFailedState_);
02465           //      fsm_.fireFailed(reasonForFailedState_,this);
02466           localLog(reasonForFailedState_);
02467           pthread_mutex_unlock(&stop_lock_);
02468           continue;
02469         }
02470       }
02471       else {
02472         pthread_mutex_unlock(&stop_lock_);
02473         continue;
02474       }
02475       pthread_mutex_unlock(&stop_lock_);
02476       if(msg1->mtype==MSQS_MESSAGE_TYPE_STOP)
02477         while(subs_[i].alive()>0) ::usleep(10000);
02478       subs_[i].disconnect();
02479     }
02480   //  subs_.clear();
02481 
02482 }
02483 
02484 void FUEventProcessor::microState(xgi::Input *in,xgi::Output *out) throw (xgi::exception::Exception)
02485 {
02486   std::string urn = getApplicationDescriptor()->getURN();
02487   try{
02488     evtProcessor_.stateNameFromIndex(0);
02489     evtProcessor_.moduleNameFromIndex(0);
02490   if(myProcess_) {std::cout << "microstate called for child! bail out" << std::endl; return;}
02491   *out << "<tr><td>" << fsm_.stateName()->toString() 
02492        << "</td><td>"<< (myProcess_ ? "S" : "M") <<"</td><td>" << nblive_ << "</td><td>"
02493        << nbdead_ << "</td><td><a href=\"/" << urn << "/procStat\">" << getpid() <<"</a></td>";
02494   evtProcessor_.microState(in,out);
02495   *out << "<td></td><td>" << nbTotalDQM_ 
02496        << "</td><td>" << evtProcessor_.getScalersUpdates() << "</td></tr>";
02497   if(nbSubProcesses_.value_!=0 && !myProcess_) 
02498     {
02499       pthread_mutex_lock(&start_lock_);
02500       for(unsigned int i = 0; i < subs_.size(); i++)
02501         {
02502           try{
02503             if(subs_[i].alive()>0)
02504               {
02505                 *out << "<tr><td  bgcolor=\"#00FF00\" id=\"a"
02506                      << i << "\">""Alive</td><td>S</td><td>"
02507                      << subs_[i].queueId() << "<td>" 
02508                      << subs_[i].queueStatus()<< "/"
02509                      << subs_[i].queueOccupancy() << "/"
02510                      << subs_[i].queuePidOfLastSend() << "/"
02511                      << subs_[i].queuePidOfLastReceive() 
02512                      << "</td><td><a id=\"p"<< i << "\" href=\"SubWeb?process=" 
02513                      << subs_[i].pid() << "&method=procStat\">" 
02514                      << subs_[i].pid()<<"</a></td>" //<< msg->mtext;
02515                      << "<td>" << evtProcessor_.stateNameFromIndex(subs_[i].params().Ms) << "</td><td>" 
02516                      << evtProcessor_.moduleNameFromIndex(subs_[i].params().ms) << "</td><td>" 
02517                      << subs_[i].params().nba << "/" << subs_[i].params().nbp 
02518                      << " (" << float(subs_[i].params().nba)/float(subs_[i].params().nbp)*100. <<"%)" 
02519                      << "</td><td>" << subs_[i].params().ls  << "/" << subs_[i].params().ls 
02520                      << "</td><td>" << subs_[i].params().ps 
02521                      << "</td><td" 
02522                      << ((subs_[i].params().eols<subs_[i].params().ls) ? " bgcolor=\"#00FF00\"" : " bgcolor=\"#FF0000\"")  << ">" 
02523                      << subs_[i].params().eols  
02524                      << "</td><td>" << subs_[i].params().dqm 
02525                      << "</td><td>" << subs_[i].params().trp << "</td>";
02526               }
02527             else 
02528               {
02529                 pthread_mutex_lock(&pickup_lock_);
02530                 *out << "<tr><td id=\"a"<< i << "\" ";
02531                 if(subs_[i].alive()==-1000)
02532                   *out << " bgcolor=\"#bbaabb\">NotInitialized";
02533                 else
02534                   *out << (subs_[i].alive()==0 ? ">Done" : " bgcolor=\"#FF0000\">Dead");
02535                 *out << "</td><td>S</td><td>"<< subs_[i].queueId() << "<td>" 
02536                      << subs_[i].queueStatus() << "/"
02537                      << subs_[i].queueOccupancy() << "/"
02538                      << subs_[i].queuePidOfLastSend() << "/"
02539                      << subs_[i].queuePidOfLastReceive() 
02540                      << "</td><td id=\"p"<< i << "\">"
02541                      <<subs_[i].pid()<<"</td><td colspan=\"5\">" << subs_[i].reasonForFailed();
02542                 if(subs_[i].alive()!=0 && subs_[i].alive()!=-1000) 
02543                   {
02544                     if(autoRestartSlaves_ && subs_[i].restartCount()<=2) 
02545                       *out << " will restart in " << subs_[i].countdown() << " s";
02546                     else if(autoRestartSlaves_)
02547                       *out << " reached maximum restart count";
02548                     else *out << " autoRestart is disabled ";
02549                   }
02550                 *out << "</td><td" 
02551                      << ((subs_[i].params().eols<subs_[i].params().ls) ? 
02552                          " bgcolor=\"#00FF00\"" : " bgcolor=\"#FF0000\"")  
02553                      << ">" 
02554                      << subs_[i].params().eols  
02555                      << "</td><td>" << subs_[i].params().dqm 
02556                      << "</td><td>" << subs_[i].params().trp << "</td>";
02557                 pthread_mutex_unlock(&pickup_lock_);
02558               }
02559             *out << "</tr>";
02560           }
02561           catch(evf::Exception &e){
02562             *out << "<tr><td id=\"a"<< i << "\" " 
02563                  <<"bgcolor=\"#FFFF00\">NotResp</td><td>S</td><td>"<< subs_[i].queueId() << "<td>" 
02564                  << subs_[i].queueStatus() << "/"
02565                  << subs_[i].queueOccupancy() << "/"
02566                  << subs_[i].queuePidOfLastSend() << "/"
02567                  << subs_[i].queuePidOfLastReceive() 
02568                  << "</td><td id=\"p"<< i << "\">"
02569                  <<subs_[i].pid()<<"</td>";
02570           }
02571         }
02572       pthread_mutex_unlock(&start_lock_); 
02573     }
02574   }
02575       catch(evf::Exception &e)
02576       {
02577         LOG4CPLUS_INFO(getApplicationLogger(),"evf::Exception caught in microstate - " << e.what());    
02578       }
02579     catch(cms::Exception &e)
02580       {
02581         LOG4CPLUS_INFO(getApplicationLogger(),"cms::Exception caught in microstate - " << e.what());    
02582       }
02583     catch(std::exception &e)
02584       {
02585         LOG4CPLUS_INFO(getApplicationLogger(),"std::Exception caught in microstate - " << e.what());    
02586       }
02587     catch(...)
02588       {
02589         LOG4CPLUS_INFO(getApplicationLogger(),"unknown exception caught in microstate - ");    
02590       }
02591 
02592 }
02593 
02594 
02595 void FUEventProcessor::updater(xgi::Input *in,xgi::Output *out) throw (xgi::exception::Exception)
02596 {
02597   using namespace utils;
02598 
02599   *out << updaterStatic_;
02600   mDiv(out,"loads");
02601   uptime(out);
02602   cDiv(out);
02603   mDiv(out,"st",fsm_.stateName()->toString());
02604   mDiv(out,"ru",runNumber_.toString());
02605   mDiv(out,"nsl",nbSubProcesses_.value_);
02606   mDiv(out,"nsr",nbSubProcessesReporting_.value_);
02607   mDiv(out,"cl");
02608   *out << getApplicationDescriptor()->getClassName() 
02609        << (nbSubProcesses_.value_ > 0 ? "MP " : " ");
02610   cDiv(out);
02611   mDiv(out,"in",getApplicationDescriptor()->getInstance());
02612   if(fsm_.stateName()->toString() != "Halted" && fsm_.stateName()->toString() != "halting"){
02613     mDiv(out,"hlt");
02614     *out << "<a href=\"" << configString_.toString() << "\">HLT Config</a>";
02615     cDiv(out);
02616     *out << std::endl;
02617   }
02618   else
02619     mDiv(out,"hlt","Not yet...");
02620 
02621   mDiv(out,"sq",squidPresent_.toString());
02622   mDiv(out,"vwl",(supervising_ ? "Active" : "Not Initialized"));
02623   mDiv(out,"mwl",evtProcessor_.wlMonitoring());
02624   if(nbProcessed != 0 && nbAccepted != 0)
02625     {
02626       mDiv(out,"tt",((xdata::UnsignedInteger32*)nbProcessed)->value_);
02627       mDiv(out,"ac",((xdata::UnsignedInteger32*)nbAccepted)->value_);
02628     }
02629   else
02630     {
02631       mDiv(out,"tt",0);
02632       mDiv(out,"ac",0);
02633     }
02634   if(!myProcess_)
02635     mDiv(out,"swl",(wlSummarizeActive_ ? "Active" : "Inactive"));
02636   else
02637     mDiv(out,"swl",(wlScalersActive_ ? "Active" : "Inactive"));
02638 
02639   mDiv(out,"idi",iDieUrl_.value_);
02640   if(vp_!=0){
02641     mDiv(out,"vpi",(unsigned int) vp_);
02642     if(vulture_->hasStarted()>=0)
02643       mDiv(out,"vul","Prowling");
02644     else
02645       mDiv(out,"vul","Dead");
02646   }
02647   else{
02648     mDiv(out,"vul",(vulture_==0 ? "Nope" : "Hatching"));
02649   }    
02650   if(evtProcessor_){
02651     mDiv(out,"ll");
02652     *out << evtProcessor_.lastLumi().ls
02653          << "," << evtProcessor_.lastLumi().proc << "," << evtProcessor_.lastLumi().acc;
02654     cDiv(out);
02655   }
02656   mDiv(out,"lg");
02657   for(unsigned int i = logRingIndex_; i<logRingSize_; i++)
02658     *out << logRing_[i] << std::endl;
02659   if(logWrap_)
02660     for(unsigned int i = 0; i<logRingIndex_; i++)
02661       *out << logRing_[i] << std::endl;
02662   cDiv(out);
02663   mDiv(out,"cha");
02664   if(cpustat_) *out << cpustat_->getChart();
02665   cDiv(out);
02666 }
02667 
02668 void FUEventProcessor::procStat(xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception)
02669 {
02670   evf::utils::procStat(out);
02671 }
02672 
02673 void FUEventProcessor::sendMessageOverMonitorQueue(MsgBuf &buf)
02674 {
02675   if(myProcess_) myProcess_->postSlave(buf,true);
02676 }
02677 
02678 void FUEventProcessor::makeStaticInfo()
02679 {
02680   using namespace utils;
02681   std::ostringstream ost;
02682   mDiv(&ost,"ve");
02683   ost<< "$Revision: 1.164 $ (" << edm::getReleaseVersion() <<")";
02684   cDiv(&ost);
02685   mDiv(&ost,"ou",outPut_.toString());
02686   mDiv(&ost,"sh",hasShMem_.toString());
02687   mDiv(&ost,"mw",hasModuleWebRegistry_.toString());
02688   mDiv(&ost,"sw",hasServiceWebRegistry_.toString());
02689   
02690   xdata::Serializable *monsleep = 0;
02691   xdata::Serializable *lstimeout = 0;
02692   try{
02693     monsleep  = applicationInfoSpace_->find("monSleepSec");
02694     lstimeout = applicationInfoSpace_->find("lsTimeOut");
02695   }
02696   catch(xdata::exception::Exception e){
02697   }
02698   
02699   if(monsleep!=0)
02700     mDiv(&ost,"ms",monsleep->toString());
02701   if(lstimeout!=0)
02702     mDiv(&ost,"lst",lstimeout->toString());
02703   char cbuf[sizeof(struct utsname)];
02704   struct utsname* buf = (struct utsname*)cbuf;
02705   uname(buf);
02706   mDiv(&ost,"sysinfo");
02707   ost << buf->sysname << " " << buf->nodename 
02708       << " " << buf->release << " " << buf->version << " " << buf->machine;
02709   cDiv(&ost);
02710   updaterStatic_ = ost.str();
02711 }
02712 
02713 void FUEventProcessor::handleSignalSlave(int sig, siginfo_t* info, void* c)
02714 {
02715   //notify master
02716   sem_post(sigmon_sem_);
02717 
02718   //sleep while master takes action
02719   sleep(2);
02720 
02721   //set up alarm if handler deadlocks on unsafe actions
02722   alarm(5);
02723   
02724   std::cout << "--- Slave EP signal handler caught signal " << sig << " process id is " << info->si_pid <<" ---" << std::endl;
02725   std::cout << "--- Address: " << std::hex << info->si_addr << std::dec << " --- " << std::endl;
02726   std::cout << "--- Stacktrace follows --" << std::endl;
02727   std::ostringstream stacktr;
02728   toolbox::stacktrace(20,stacktr);
02729   std::cout << stacktr.str();
02730   if (!rlimit_coresize_changed_)
02731     std::cout << "--- Dumping core." <<  " --- " << std::endl;
02732   else
02733     std::cout << "--- Core dump count exceeded on this FU. ---"<<std::endl;
02734  
02735   std::string hasdump = "";
02736   if (rlimit_coresize_changed_) hasdump = " (core dump disabled) ";
02737 
02738   LOG4CPLUS_ERROR(getApplicationLogger(),    "--- Slave EP signal handler caught signal " << sig << ". process id is " << getpid() 
02739                                           << " on node " << toolbox::net::getHostName() << " ---" << std::endl
02740                                           << "--- Address: " << std::hex << info->si_addr << std::dec << " --- " << std::endl
02741                                           << "--- Stacktrace follows"<< hasdump << " ---" << std::endl << stacktr.str()
02742                                           );
02743 
02744   //re-raise signal with default handler (will cause core dump if enabled)
02745   raise(sig);
02746 }
02747 
02748 
02749 XDAQ_INSTANTIATOR_IMPL(evf::FUEventProcessor)