CMS 3D CMS Logo

/data/doxygen/doxygen-1.7.3/gen/CMSSW_4_2_8/src/EventFilter/Processor/src/FUEventProcessor.cc

Go to the documentation of this file.
00001 
00002 //
00003 // FUEventProcessor
00004 // ----------------
00005 //
00007 
00008 #include "FUEventProcessor.h"
00009 #include "procUtils.h"
00010 #include "EventFilter/Utilities/interface/CPUStat.h"
00011 #include "EventFilter/Utilities/interface/RateStat.h"
00012 
00013 #include "EventFilter/Utilities/interface/Exception.h"
00014 
00015 #include "EventFilter/Message2log4cplus/interface/MLlog4cplus.h"
00016 #include "EventFilter/Modules/interface/FUShmDQMOutputService.h"
00017 #include "EventFilter/Utilities/interface/ServiceWebRegistry.h"
00018 #include "EventFilter/Utilities/interface/ServiceWeb.h"
00019 
00020 
00021 #include "FWCore/PluginManager/interface/ProblemTracker.h"
00022 #include "FWCore/PluginManager/interface/PresenceFactory.h"
00023 #include "FWCore/Utilities/interface/Presence.h"
00024 #include "FWCore/Utilities/interface/Exception.h"
00025 #include "FWCore/Version/interface/GetReleaseVersion.h"
00026 #include "FWCore/ServiceRegistry/interface/Service.h"
00027 
00028 #include "toolbox/BSem.h" 
00029 
00030 #include <boost/tokenizer.hpp>
00031 
00032 #include "xcept/tools.h"
00033 #include "xgi/Method.h"
00034 
00035 #include "cgicc/CgiDefs.h"
00036 #include "cgicc/Cgicc.h"
00037 #include "cgicc/FormEntry.h"
00038 
00039 
00040 #include <sys/wait.h>
00041 #include <sys/utsname.h>
00042 
00043 #include <typeinfo>
00044 #include <stdlib.h>
00045 #include <stdio.h>
00046 #include <errno.h>
00047 
00048 using namespace evf;
00049 using namespace cgicc;
00050 namespace toolbox {
00051   namespace mem {
00052     extern toolbox::BSem * _s_mutex_ptr_; 
00053   }
00054 }
00055 
00057 // construction/destruction
00059 
00060 //______________________________________________________________________________
00061 FUEventProcessor::FUEventProcessor(xdaq::ApplicationStub *s) 
00062   : xdaq::Application(s)
00063   , fsm_(this)
00064   , log_(getApplicationLogger())
00065   , evtProcessor_(log_, getApplicationDescriptor()->getInstance())
00066   , runNumber_(0)
00067   , epInitialized_(false)
00068   , outPut_(true)
00069   , autoRestartSlaves_(false)
00070   , slaveRestartDelaySecs_(10)
00071   , hasShMem_(true)
00072   , hasPrescaleService_(true)
00073   , hasModuleWebRegistry_(true)
00074   , hasServiceWebRegistry_(true)
00075   , isRunNumberSetter_(true)
00076   , iDieStatisticsGathering_(false)
00077   , outprev_(true)
00078   , exitOnError_(true)
00079   , reasonForFailedState_()
00080   , squidnet_(3128,"https://localhost:8000/RELEASE-NOTES.txt")
00081   , logRing_(logRingSize_)
00082   , logRingIndex_(logRingSize_)
00083   , logWrap_(false)
00084   , nbSubProcesses_(0)
00085   , nbSubProcessesReporting_(0)
00086   , nblive_(0)
00087   , nbdead_(0)
00088   , nbTotalDQM_(0)
00089   , wlReceiving_(0)
00090   , asReceiveMsgAndExecute_(0)
00091   , receiving_(false) 
00092   , wlReceivingMonitor_(0)
00093   , asReceiveMsgAndRead_(0)
00094   , receivingM_(false)
00095   , myProcess_(0)
00096   , wlSupervising_(0)
00097   , asSupervisor_(0)
00098   , supervising_(false)
00099   , monitorInfoSpace_(0)
00100   , monitorLegendaInfoSpace_(0)
00101   , applicationInfoSpace_(0)
00102   , nbProcessed(0)
00103   , nbAccepted(0)
00104   , scalersInfoSpace_(0)
00105   , scalersLegendaInfoSpace_(0)
00106   , wlScalers_(0)
00107   , asScalers_(0)
00108   , wlScalersActive_(false)
00109   , scalersUpdates_(0)
00110   , wlSummarize_(0)
00111   , asSummarize_(0)
00112   , wlSummarizeActive_(false)
00113   , superSleepSec_(1)
00114   , iDieUrl_("none")
00115   , vulture_(0)
00116   , vp_(0)
00117   , cpustat_(0)
00118   , ratestat_(0)
00119   , master_message_prg_(0,MSQM_MESSAGE_TYPE_PRG)
00120   , master_message_prr_(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_PRR)
00121   , slave_message_prr_(sizeof(prg),MSQS_MESSAGE_TYPE_PRR)
00122   , master_message_trr_(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_TRR)
00123 {
00124   using namespace utils;
00125 
00126   names_.push_back("nbProcessed"    );
00127   names_.push_back("nbAccepted"     );
00128   names_.push_back("epMacroStateInt");
00129   names_.push_back("epMicroStateInt");
00130   // create pipe for web communication
00131   int retpipe = pipe(anonymousPipe_);
00132   if(retpipe != 0)
00133         LOG4CPLUS_ERROR(getApplicationLogger(),"Failed to create pipe");
00134   // check squid presence
00135   squidPresent_ = squidnet_.check();
00136   //pass application parameters to FWEPWrapper
00137   evtProcessor_.setAppDesc(getApplicationDescriptor());
00138   evtProcessor_.setAppCtxt(getApplicationContext());
00139   // bind relevant callbacks to finite state machine
00140   fsm_.initialize<evf::FUEventProcessor>(this);
00141   
00142   //set sourceId_
00143   url_ =
00144     getApplicationDescriptor()->getContextDescriptor()->getURL()+"/"+
00145     getApplicationDescriptor()->getURN();
00146   class_   =getApplicationDescriptor()->getClassName();
00147   instance_=getApplicationDescriptor()->getInstance();
00148   sourceId_=class_.toString()+"_"+instance_.toString();
00149   LOG4CPLUS_INFO(getApplicationLogger(),sourceId_ <<" constructor"         );
00150   LOG4CPLUS_INFO(getApplicationLogger(),"CMSSW_BASE:"<<getenv("CMSSW_BASE"));
00151   
00152   getApplicationDescriptor()->setAttribute("icon", "/evf/images/epicon.jpg");
00153   
00154   xdata::InfoSpace *ispace = getApplicationInfoSpace();
00155   applicationInfoSpace_ = ispace;
00156 
00157   // default configuration
00158   ispace->fireItemAvailable("parameterSet",         &configString_                );
00159   ispace->fireItemAvailable("epInitialized",        &epInitialized_               );
00160   ispace->fireItemAvailable("stateName",             fsm_.stateName()             );
00161   ispace->fireItemAvailable("runNumber",            &runNumber_                   );
00162   ispace->fireItemAvailable("outputEnabled",        &outPut_                      );
00163 
00164   ispace->fireItemAvailable("hasSharedMemory",      &hasShMem_);
00165   ispace->fireItemAvailable("hasPrescaleService",   &hasPrescaleService_          );
00166   ispace->fireItemAvailable("hasModuleWebRegistry", &hasModuleWebRegistry_        );
00167   ispace->fireItemAvailable("hasServiceWebRegistry", &hasServiceWebRegistry_      );
00168   ispace->fireItemAvailable("isRunNumberSetter",    &isRunNumberSetter_           );
00169   ispace->fireItemAvailable("iDieStatisticsGathering",   &iDieStatisticsGathering_);
00170   ispace->fireItemAvailable("rcmsStateListener",     fsm_.rcmsStateListener()     );
00171   ispace->fireItemAvailable("foundRcmsStateListener",fsm_.foundRcmsStateListener());
00172   ispace->fireItemAvailable("nbSubProcesses",       &nbSubProcesses_              );
00173   ispace->fireItemAvailable("nbSubProcessesReporting",&nbSubProcessesReporting_   );
00174   ispace->fireItemAvailable("superSleepSec",        &superSleepSec_               );
00175   ispace->fireItemAvailable("autoRestartSlaves",    &autoRestartSlaves_           );
00176   ispace->fireItemAvailable("slaveRestartDelaySecs",&slaveRestartDelaySecs_       );
00177   ispace->fireItemAvailable("iDieUrl",              &iDieUrl_                     );
00178   
00179   // Add infospace listeners for exporting data values
00180   getApplicationInfoSpace()->addItemChangedListener("parameterSet",        this);
00181   getApplicationInfoSpace()->addItemChangedListener("outputEnabled",       this);
00182 
00183   // findRcmsStateListener
00184   fsm_.findRcmsStateListener();
00185   
00186   // initialize monitoring infospace
00187 
00188   std::string monInfoSpaceName="evf-eventprocessor-status-monitor";
00189   toolbox::net::URN urn = this->createQualifiedInfoSpace(monInfoSpaceName);
00190   monitorInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
00191 
00192   std::string monLegendaInfoSpaceName="evf-eventprocessor-status-legenda";
00193   urn = this->createQualifiedInfoSpace(monLegendaInfoSpaceName);
00194   monitorLegendaInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
00195 
00196   
00197   monitorInfoSpace_->fireItemAvailable("url",                      &url_            );
00198   monitorInfoSpace_->fireItemAvailable("class",                    &class_          );
00199   monitorInfoSpace_->fireItemAvailable("instance",                 &instance_       );
00200   monitorInfoSpace_->fireItemAvailable("runNumber",                &runNumber_      );
00201   monitorInfoSpace_->fireItemAvailable("stateName",                 fsm_.stateName()); 
00202 
00203   monitorInfoSpace_->fireItemAvailable("squidPresent",             &squidPresent_   );
00204 
00205   std::string scalersInfoSpaceName="evf-eventprocessor-scalers-monitor";
00206   urn = this->createQualifiedInfoSpace(scalersInfoSpaceName);
00207   scalersInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
00208 
00209   std::string scalersLegendaInfoSpaceName="evf-eventprocessor-scalers-legenda";
00210   urn = this->createQualifiedInfoSpace(scalersLegendaInfoSpaceName);
00211   scalersLegendaInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
00212 
00213 
00214 
00215   evtProcessor_.setScalersInfoSpace(scalersInfoSpace_,scalersLegendaInfoSpace_);
00216   scalersInfoSpace_->fireItemAvailable("instance", &instance_);
00217 
00218   evtProcessor_.setApplicationInfoSpace(ispace);
00219   evtProcessor_.setMonitorInfoSpace(monitorInfoSpace_,monitorLegendaInfoSpace_);
00220   evtProcessor_.publishConfigAndMonitorItems(nbSubProcesses_.value_!=0);
00221 
00222   //subprocess state vectors for MP
00223   monitorInfoSpace_->fireItemAvailable("epMacroStateInt",             &spMStates_); 
00224   monitorInfoSpace_->fireItemAvailable("epMicroStateInt",             &spmStates_); 
00225   
00226   // Bind web interface
00227   xgi::bind(this, &FUEventProcessor::css,              "styles.css");
00228   xgi::bind(this, &FUEventProcessor::defaultWebPage,   "Default"   );
00229   xgi::bind(this, &FUEventProcessor::spotlightWebPage, "Spotlight" );
00230   xgi::bind(this, &FUEventProcessor::scalersWeb,       "scalersWeb");
00231   xgi::bind(this, &FUEventProcessor::pathNames,        "pathNames" );
00232   xgi::bind(this, &FUEventProcessor::subWeb,           "SubWeb"    );
00233   xgi::bind(this, &FUEventProcessor::moduleWeb,        "moduleWeb" );
00234   xgi::bind(this, &FUEventProcessor::serviceWeb,       "serviceWeb");
00235   xgi::bind(this, &FUEventProcessor::microState,       "microState");
00236   xgi::bind(this, &FUEventProcessor::updater,          "updater"   );
00237   xgi::bind(this, &FUEventProcessor::procStat,         "procStat"  );
00238 
00239   // instantiate the plugin manager, not referenced here after!
00240 
00241   edm::AssertHandler ah;
00242 
00243   try{
00244     LOG4CPLUS_DEBUG(getApplicationLogger(),
00245                     "Trying to create message service presence ");
00246     edm::PresenceFactory *pf = edm::PresenceFactory::get();
00247     if(pf != 0) {
00248       pf->makePresence("MessageServicePresence").release();
00249     }
00250     else {
00251       LOG4CPLUS_ERROR(getApplicationLogger(),
00252                       "Unable to create message service presence ");
00253     }
00254   } 
00255   catch(...) {
00256     LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception");
00257   }
00258   ML::MLlog4cplus::setAppl(this);      
00259 
00260   typedef std::set<xdaq::ApplicationDescriptor*> AppDescSet_t;
00261   typedef AppDescSet_t::iterator                 AppDescIter_t;
00262   
00263   AppDescSet_t rcms=
00264     getApplicationContext()->getDefaultZone()->
00265     getApplicationDescriptors("RCMSStateListener");
00266   if(rcms.size()==0) 
00267     {
00268       LOG4CPLUS_WARN(getApplicationLogger(),
00269                        "MonitorReceiver not found, perhaphs it has not been defined ? Scalers updater wl will bail out!");
00270       //        localLog("-W- MonitorReceiver not found, perhaphs it has not been defined ? Scalers updater wl will bail out!");
00271     }
00272   else
00273     {
00274       AppDescIter_t it = rcms.begin();
00275       evtProcessor_.setRcms(*it);
00276     }
00277   pthread_mutex_init(&start_lock_,0);
00278   pthread_mutex_init(&stop_lock_,0);
00279   pthread_mutex_init(&pickup_lock_,0);
00280 
00281   makeStaticInfo();
00282   startSupervisorLoop();  
00283 
00284   if(vulture_==0) vulture_ = new Vulture(true);
00285 
00287 
00288   AppDescSet_t setOfiDie=
00289     getApplicationContext()->getDefaultZone()->
00290     getApplicationDescriptors("evf::iDie");
00291   
00292   for (AppDescIter_t it=setOfiDie.begin();it!=setOfiDie.end();++it)
00293     if ((*it)->getInstance()==0) // there has to be only one instance of iDie
00294       iDieUrl_ = (*it)->getContextDescriptor()->getURL() + "/" + (*it)->getURN();
00295 }
00296 //___________here ends the *huge* constructor___________________________________
00297 
00298 
00299 //______________________________________________________________________________
00300 FUEventProcessor::~FUEventProcessor()
00301 {
00302   // no longer needed since the wrapper is a member of the class and one can rely on 
00303   // implicit destructor - to be revised - at any rate the most common exit path is via "kill"...
00304   //  if (evtProcessor_) delete evtProcessor_;
00305   if(vulture_ != 0) delete vulture_;
00306 }
00307 
00308 
00310 // implementation of member functions
00312 
00313 
00314 //______________________________________________________________________________
00315 bool FUEventProcessor::configuring(toolbox::task::WorkLoop* wl)
00316 {
00317 //   std::cout << "values " << ((nbSubProcesses_.value_!=0) ? 0x10 : 0) << " "
00318 //          << ((instance_.value_==0) ? 0x8 : 0) << " "
00319 //          << (hasServiceWebRegistry_.value_ ? 0x4 : 0) << " "
00320 //          << (hasModuleWebRegistry_.value_ ? 0x2 : 0) << " "
00321 //          << (hasPrescaleService_.value_ ? 0x1 : 0) <<std::endl;
00322   unsigned short smap 
00323     = ((nbSubProcesses_.value_!=0) ? 0x10 : 0)
00324     + ((instance_.value_==0) ? 0x8 : 0)
00325     + (hasServiceWebRegistry_.value_ ? 0x4 : 0) 
00326     + (hasModuleWebRegistry_.value_ ? 0x2 : 0) 
00327     + (hasPrescaleService_.value_ ? 0x1 : 0);
00328   if(nbSubProcesses_.value_==0) 
00329     {
00330       spMStates_.setSize(1); 
00331       spmStates_.setSize(1); 
00332     }
00333   else
00334     {
00335       spMStates_.setSize(nbSubProcesses_.value_);
00336       spmStates_.setSize(nbSubProcesses_.value_);
00337       for(unsigned int i = 0; i < spMStates_.size(); i++)
00338         {
00339           spMStates_[i] = edm::event_processor::sInit; 
00340           spmStates_[i] = 0; 
00341         }
00342     }
00343   try {
00344     LOG4CPLUS_INFO(getApplicationLogger(),"Start configuring ...");
00345     std::string cfg = configString_.toString(); evtProcessor_.init(smap,cfg);
00346     epInitialized_=true;
00347     if(evtProcessor_)
00348       {
00349         // moved to wrapper class
00350         configuration_ = evtProcessor_.configuration();
00351         if(nbSubProcesses_.value_==0) evtProcessor_.startMonitoringWorkLoop(); 
00352         evtProcessor_->beginJob(); 
00353         if(cpustat_) {delete cpustat_; cpustat_=0;}
00354         cpustat_ = new CPUStat(evtProcessor_.getNumberOfMicrostates(),
00355                                iDieUrl_.value_);
00356         if(ratestat_) {delete ratestat_; ratestat_=0;}
00357         ratestat_ = new RateStat(iDieUrl_.value_);
00358         if(iDieStatisticsGathering_.value_){
00359           try{
00360             cpustat_->sendLegenda(evtProcessor_.getmicromap());
00361             xdata::Serializable *legenda = scalersLegendaInfoSpace_->find("scalersLegenda");
00362             if(legenda !=0){
00363               std::string slegenda = ((xdata::String*)legenda)->value_;
00364               ratestat_->sendLegenda(slegenda);
00365             }
00366 
00367           }
00368           catch(evf::Exception &e){
00369             LOG4CPLUS_INFO(getApplicationLogger(),"coud not send legenda"
00370                            << e.what());
00371           }
00372         }
00373         
00374         fsm_.fireEvent("ConfigureDone",this);
00375         LOG4CPLUS_INFO(getApplicationLogger(),"Finished configuring!");
00376         localLog("-I- Configuration completed");
00377 
00378       }
00379   }
00380   catch (xcept::Exception &e) {
00381     reasonForFailedState_ = "configuring FAILED: " + (std::string)e.what();
00382     fsm_.fireFailed(reasonForFailedState_,this);
00383     localLog(reasonForFailedState_);
00384   }
00385   catch(cms::Exception &e) {
00386     reasonForFailedState_ = e.explainSelf();
00387     fsm_.fireFailed(reasonForFailedState_,this);
00388     localLog(reasonForFailedState_);
00389   }    
00390   catch(std::exception &e) {
00391     reasonForFailedState_ = e.what();
00392     fsm_.fireFailed(reasonForFailedState_,this);
00393     localLog(reasonForFailedState_);
00394   }
00395   catch(...) {
00396     fsm_.fireFailed("Unknown Exception",this);
00397   }
00398 
00399   if(vulture_!=0 && vp_ == 0) vp_ = vulture_->makeProcess();
00400 
00401   return false;
00402 }
00403 
00404 
00405 
00406 
00407 //______________________________________________________________________________
00408 bool FUEventProcessor::enabling(toolbox::task::WorkLoop* wl)
00409 {
00410   nbTotalDQM_ = 0;
00411   scalersUpdates_ = 0;
00412 //   std::cout << "values " << ((nbSubProcesses_.value_!=0) ? 0x10 : 0) << " "
00413 //          << ((instance_.value_==0) ? 0x8 : 0) << " "
00414 //          << (hasServiceWebRegistry_.value_ ? 0x4 : 0) << " "
00415 //          << (hasModuleWebRegistry_.value_ ? 0x2 : 0) << " "
00416 //          << (hasPrescaleService_.value_ ? 0x1 : 0) <<std::endl;
00417   unsigned short smap 
00418     = ((nbSubProcesses_.value_!=0) ? 0x10 : 0)
00419     + ((instance_.value_==0) ? 0x8 : 0)
00420     + (hasServiceWebRegistry_.value_ ? 0x4 : 0) 
00421     + (hasModuleWebRegistry_.value_ ? 0x2 : 0) 
00422     + (hasPrescaleService_.value_ ? 0x1 : 0);
00423 
00424   LOG4CPLUS_INFO(getApplicationLogger(),"Start enabling...");
00425   if(!epInitialized_){
00426     evtProcessor_.forceInitEventProcessorMaybe();
00427   }
00428   std::string cfg = configString_.toString(); evtProcessor_.init(smap,cfg);
00429 
00430   if(!epInitialized_){
00431     evtProcessor_->beginJob(); 
00432     if(cpustat_) {delete cpustat_; cpustat_=0;}
00433     cpustat_ = new CPUStat(evtProcessor_.getNumberOfMicrostates(),
00434                            iDieUrl_.value_);
00435     if(iDieStatisticsGathering_.value_){
00436       try{
00437         cpustat_->sendLegenda(evtProcessor_.getmicromap());
00438         xdata::Serializable *legenda = scalersInfoSpace_->find("scalersLegenda");
00439         if(legenda !=0){
00440           std::string slegenda = ((xdata::String*)legenda)->value_;
00441           ratestat_->sendLegenda(slegenda);
00442        }
00443       }
00444       catch(evf::Exception &e){
00445         LOG4CPLUS_INFO(getApplicationLogger(),"coud not send legenda"
00446                        << e.what());
00447       }
00448     }
00449     if(ratestat_) {delete ratestat_; ratestat_=0;}
00450     ratestat_ = new RateStat(iDieUrl_.value_);
00451     epInitialized_ = true;
00452   }
00453   configuration_ = evtProcessor_.configuration(); // get it again after init has been carried out...
00454   evtProcessor_.resetLumiSectionReferenceIndex();
00455   //classic appl will return here 
00456   if(nbSubProcesses_.value_==0) return enableClassic();
00457   //protect manipulation of subprocess array
00458   pthread_mutex_lock(&start_lock_);
00459   subs_.clear();
00460   subs_.resize(nbSubProcesses_.value_); // this should not be necessary
00461   pid_t retval = -1;
00462   for(unsigned int i=0; i<nbSubProcesses_.value_; i++)
00463     {
00464       subs_[i]=SubProcess(i,retval); //this will replace all the scattered variables
00465     }
00466   pthread_mutex_unlock(&start_lock_);
00467 
00468   for(unsigned int i=0; i<nbSubProcesses_.value_; i++)
00469     {
00470       retval = subs_[i].forkNew();
00471       if(retval==0)
00472         {
00473           myProcess_ = &subs_[i];
00474           // dirty hack: delete/recreate global binary semaphore for later use in child
00475           delete toolbox::mem::_s_mutex_ptr_;
00476           toolbox::mem::_s_mutex_ptr_ = new toolbox::BSem(toolbox::BSem::FULL,true);
00477           int retval = pthread_mutex_destroy(&stop_lock_);
00478           if(retval != 0) perror("error");
00479           retval = pthread_mutex_init(&stop_lock_,0);
00480           if(retval != 0) perror("error");
00481           fsm_.disableRcmsStateNotification();
00482           return enableMPEPSlave();
00483           // the loop is broken in the child 
00484         }
00485     }
00486   
00487   startSummarizeWorkLoop();
00488   vp_ = vulture_->start(iDieUrl_.value_,runNumber_.value_);
00489 
00490   LOG4CPLUS_INFO(getApplicationLogger(),"Finished enabling!");
00491   fsm_.fireEvent("EnableDone",this);
00492   localLog("-I- Start completed");
00493   return false;
00494 }
00495 
00496 
00497 //______________________________________________________________________________
00498 bool FUEventProcessor::stopping(toolbox::task::WorkLoop* wl)
00499 {
00500   if(nbSubProcesses_.value_!=0) 
00501     stopSlavesAndAcknowledge();
00502   vulture_->stop();
00503   return stopClassic();
00504 }
00505 
00506 
00507 //______________________________________________________________________________
00508 bool FUEventProcessor::halting(toolbox::task::WorkLoop* wl)
00509 {
00510   LOG4CPLUS_INFO(getApplicationLogger(),"Start halting ...");
00511   if(nbSubProcesses_.value_!=0) 
00512     stopSlavesAndAcknowledge();
00513   try{
00514     evtProcessor_.stopAndHalt();
00515   }
00516   catch (evf::Exception &e) {
00517     reasonForFailedState_ = "halting FAILED: " + (std::string)e.what();
00518     localLog(reasonForFailedState_);
00519     fsm_.fireFailed(reasonForFailedState_,this);
00520   }
00521   //  if(hasShMem_) detachDqmFromShm();
00522 
00523   LOG4CPLUS_INFO(getApplicationLogger(),"Finished halting!");
00524   fsm_.fireEvent("HaltDone",this);
00525 
00526   localLog("-I- Halt completed");
00527   return false;
00528 }
00529 
00530 
00531 //______________________________________________________________________________
00532 xoap::MessageReference FUEventProcessor::fsmCallback(xoap::MessageReference msg)
00533   throw (xoap::exception::Exception)
00534 {
00535   return fsm_.commandCallback(msg);
00536 }
00537 
00538 
00539 //______________________________________________________________________________
00540 void FUEventProcessor::actionPerformed(xdata::Event& e)
00541 {
00542 
00543   if (e.type()=="ItemChangedEvent" && fsm_.stateName()->toString()!="Halted") {
00544     std::string item = dynamic_cast<xdata::ItemChangedEvent&>(e).itemName();
00545     
00546     if ( item == "parameterSet") {
00547       LOG4CPLUS_WARN(getApplicationLogger(),
00548                      "HLT Menu changed, force reinitialization of EventProcessor");
00549       epInitialized_ = false;
00550     }
00551     if ( item == "outputEnabled") {
00552       if(outprev_ != outPut_) {
00553         LOG4CPLUS_WARN(getApplicationLogger(),
00554                        (outprev_ ? "Disabling " : "Enabling ")<<"global output");
00555         evtProcessor_->enableEndPaths(outPut_);
00556         outprev_ = outPut_;
00557       }
00558     }
00559     if (item == "globalInputPrescale") {
00560       LOG4CPLUS_WARN(getApplicationLogger(),
00561                      "Setting global input prescale has no effect "
00562                      <<"in this version of the code");
00563     }
00564     if ( item == "globalOutputPrescale") {
00565       LOG4CPLUS_WARN(getApplicationLogger(),
00566                      "Setting global output prescale has no effect "
00567                      <<"in this version of the code");
00568     }
00569   }
00570   
00571 }
00572 
00573 //______________________________________________________________________________
00574 void FUEventProcessor::subWeb(xgi::Input  *in, xgi::Output *out)
00575 {
00576   using namespace cgicc;
00577   pid_t pid = 0;
00578   std::ostringstream ost;
00579   ost << "&";
00580 
00581   Cgicc cgi(in);
00582   internal::MyCgi *mycgi = (internal::MyCgi*)in;
00583   for(std::map<std::string, std::string, std::less<std::string> >::iterator mit = 
00584         mycgi->getEnvironment().begin();
00585       mit != mycgi->getEnvironment().end(); mit++)
00586     ost << mit->first << "%" << mit->second << ";";
00587   std::vector<FormEntry> els = cgi.getElements() ;
00588   std::vector<FormEntry> el1;
00589   cgi.getElement("method",el1);
00590   std::vector<FormEntry> el2;
00591   cgi.getElement("process",el2);
00592   if(el1.size()!=0) {
00593     std::string meth = el1[0].getValue();
00594     if(el2.size()!=0) {
00595       unsigned int i = 0;
00596       std::string mod = el2[0].getValue();
00597       pid = atoi(mod.c_str()); // get the process id to be polled
00598       for(; i < subs_.size(); i++)
00599         if(subs_[i].pid()==pid) break;
00600       if(i>=subs_.size()){ //process was not found, let the browser know
00601         *out << "ERROR 404 : Process " << pid << " Not Found !" << std::endl;
00602         return;
00603       } 
00604       if(subs_[i].alive() != 1){
00605         *out << "ERROR 405 : Process " << pid << " Not Alive !" << std::endl;
00606         return;
00607       }
00608       MsgBuf msg1(meth.length()+ost.str().length(),MSQM_MESSAGE_TYPE_WEB);
00609       strcpy(msg1->mtext,meth.c_str());
00610       strcpy(msg1->mtext+meth.length(),ost.str().c_str());
00611       subs_[i].post(msg1,true);
00612       unsigned int keep_supersleep_original_value = superSleepSec_.value_;
00613       superSleepSec_.value_=10*keep_supersleep_original_value;
00614       MsgBuf msg(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_WEB);
00615       bool done = false;
00616       std::vector<char *>pieces;
00617       while(!done){
00618         unsigned long retval1 = subs_[i].rcvNonBlocking(msg,true);
00619         if(retval1 == MSGQ_MESSAGE_TYPE_RANGE){
00620 	  ::sleep(1);
00621           continue;
00622         }
00623         unsigned int nbytes = atoi(msg->mtext);
00624         if(nbytes < MAX_PIPE_BUFFER_SIZE) done = true; // this will break the while loop
00625         char *buf= new char[nbytes];
00626         ssize_t retval = read(anonymousPipe_[PIPE_READ],buf,nbytes);
00627         if(retval!=nbytes) std::cout 
00628           << "CAREFUL HERE, read less bytes than expected from pipe in subWeb" << std::endl;
00629         pieces.push_back(buf);
00630       }
00631       superSleepSec_.value_=keep_supersleep_original_value;
00632       for(unsigned int j = 0; j < pieces.size(); j++){
00633         *out<<pieces[j];    // chain the buffers into the output strstream
00634         delete[] pieces[j]; //make sure to release all buffers used for reading the pipe
00635       }
00636     }
00637   }
00638 }
00639 
00640 
00641 //______________________________________________________________________________
00642 void FUEventProcessor::defaultWebPage(xgi::Input  *in, xgi::Output *out)
00643   throw (xgi::exception::Exception)
00644 {
00645 
00646 
00647   *out << "<!DOCTYPE HTML PUBLIC \"-//W3C//DTD HTML 4.0 Transitional//EN\">" 
00648        << "<html><head><title>" << getApplicationDescriptor()->getClassName() << (nbSubProcesses_.value_ > 0 ? "MP " : " ") 
00649        << getApplicationDescriptor()->getInstance() << "</title>"
00650        << "<meta https-equiv=\"REFRESH\" content=\"0;url=/evf/html/defaultBasePage.html\">"
00651        << "</head></html>";
00652 }
00653 
00654 
00655 //______________________________________________________________________________
00656 
00657 
00658 void FUEventProcessor::spotlightWebPage(xgi::Input  *in, xgi::Output *out)
00659   throw (xgi::exception::Exception)
00660 {
00661 
00662   std::string urn = getApplicationDescriptor()->getURN();
00663 
00664   *out << "<!-- base href=\"/" <<  urn
00665        << "\"> -->" << std::endl;
00666   *out << "<html>"                                                   << std::endl;
00667   *out << "<head>"                                                   << std::endl;
00668   *out << "<link type=\"text/css\" rel=\"stylesheet\"";
00669   *out << " href=\"/evf/html/styles.css\"/>"                   << std::endl;
00670   *out << "<title>" << getApplicationDescriptor()->getClassName() 
00671        << getApplicationDescriptor()->getInstance() 
00672        << " MAIN</title>"     << std::endl;
00673   *out << "</head>"                                                  << std::endl;
00674   *out << "<body>"                                                   << std::endl;
00675   *out << "<table border=\"0\" width=\"100%\">"                      << std::endl;
00676   *out << "<tr>"                                                     << std::endl;
00677   *out << "  <td align=\"left\">"                                    << std::endl;
00678   *out << "    <img"                                                 << std::endl;
00679   *out << "     align=\"middle\""                                    << std::endl;
00680   *out << "     src=\"/evf/images/spoticon.jpg\""                            << std::endl;
00681   *out << "     alt=\"main\""                                        << std::endl;
00682   *out << "     width=\"64\""                                        << std::endl;
00683   *out << "     height=\"64\""                                       << std::endl;
00684   *out << "     border=\"\"/>"                                       << std::endl;
00685   *out << "    <b>"                                                  << std::endl;
00686   *out << getApplicationDescriptor()->getClassName() 
00687        << getApplicationDescriptor()->getInstance()                  << std::endl;
00688   *out << "      " << fsm_.stateName()->toString()                   << std::endl;
00689   *out << "    </b>"                                                 << std::endl;
00690   *out << "  </td>"                                                  << std::endl;
00691   *out << "  <td width=\"32\">"                                      << std::endl;
00692   *out << "    <a href=\"/urn:xdaq-application:lid=3\">"             << std::endl;
00693   *out << "      <img"                                               << std::endl;
00694   *out << "       align=\"middle\""                                  << std::endl;
00695   *out << "       src=\"/hyperdaq/images/HyperDAQ.jpg\""             << std::endl;
00696   *out << "       alt=\"HyperDAQ\""                                  << std::endl;
00697   *out << "       width=\"32\""                                      << std::endl;
00698   *out << "       height=\"32\""                                     << std::endl;
00699   *out << "       border=\"\"/>"                                     << std::endl;
00700   *out << "    </a>"                                                 << std::endl;
00701   *out << "  </td>"                                                  << std::endl;
00702   *out << "  <td width=\"32\">"                                      << std::endl;
00703   *out << "  </td>"                                                  << std::endl;
00704   *out << "  <td width=\"32\">"                                      << std::endl;
00705   *out << "    <a href=\"/" << urn << "/\">"                         << std::endl;
00706   *out << "      <img"                                               << std::endl;
00707   *out << "       align=\"middle\""                                  << std::endl;
00708   *out << "       src=\"/evf/images/epicon.jpg\""                    << std::endl;
00709   *out << "       alt=\"main\""                                      << std::endl;
00710   *out << "       width=\"32\""                                      << std::endl;
00711   *out << "       height=\"32\""                                     << std::endl;
00712   *out << "       border=\"\"/>"                                     << std::endl;
00713   *out << "    </a>"                                                 << std::endl;
00714   *out << "  </td>"                                                  << std::endl;
00715   *out << "</tr>"                                                    << std::endl;
00716   *out << "</table>"                                                 << std::endl;
00717 
00718   *out << "<hr/>"                                                    << std::endl;
00719   
00720   std::ostringstream ost;
00721   if(myProcess_) 
00722     ost << "/SubWeb?process=" << getpid() << "&method=moduleWeb&";
00723   else
00724     ost << "/moduleWeb?";
00725   urn += ost.str();
00726   if(evtProcessor_ && (myProcess_ || nbSubProcesses_.value_==0))
00727     evtProcessor_.taskWebPage(in,out,urn);
00728   else if(evtProcessor_)
00729     evtProcessor_.summaryWebPage(in,out,urn);
00730   else
00731     *out << "<td>HLT Unconfigured</td>" << std::endl;
00732   *out << "</table>"                                                 << std::endl;
00733   
00734   *out << "<br><textarea rows=" << 10 << " cols=80 scroll=yes>"      << std::endl;
00735   *out << configuration_                                             << std::endl;
00736   *out << "</textarea><P>"                                           << std::endl;
00737   
00738   *out << "</body>"                                                  << std::endl;
00739   *out << "</html>"                                                  << std::endl;
00740 
00741 
00742 }
00743 void FUEventProcessor::scalersWeb(xgi::Input  *in, xgi::Output *out)
00744   throw (xgi::exception::Exception)
00745 {
00746 
00747   out->getHTTPResponseHeader().addHeader( "Content-Type",
00748                                           "application/octet-stream" );
00749   out->getHTTPResponseHeader().addHeader( "Content-Transfer-Encoding",
00750                                           "binary" );
00751   if(evtProcessor_ != 0){
00752     out->write( (char*)(evtProcessor_.getPackedTriggerReportAsStruct()), sizeof(TriggerReportStatic) );
00753   }
00754 }
00755 
00756 void FUEventProcessor::pathNames(xgi::Input  *in, xgi::Output *out)
00757   throw (xgi::exception::Exception)
00758 {
00759 
00760   if(evtProcessor_ != 0){
00761     xdata::Serializable *legenda = scalersLegendaInfoSpace_->find("scalersLegenda");
00762     if(legenda !=0){
00763       std::string slegenda = ((xdata::String*)legenda)->value_;
00764       *out << slegenda << std::endl;
00765     }
00766   }
00767 }
00768 
00769 void FUEventProcessor::attachDqmToShm() throw (evf::Exception)  
00770 {
00771   std::string errmsg;
00772   bool success = false;
00773   try {
00774     edm::ServiceRegistry::Operate operate(evtProcessor_->getToken());
00775     if(edm::Service<FUShmDQMOutputService>().isAvailable())
00776       success = edm::Service<FUShmDQMOutputService>()->attachToShm();
00777     if (!success) errmsg = "Failed to attach DQM service to shared memory";
00778   }
00779   catch (cms::Exception& e) {
00780     errmsg = "Failed to attach DQM service to shared memory: " + (std::string)e.what();
00781   }
00782   if (!errmsg.empty()) XCEPT_RAISE(evf::Exception,errmsg);
00783 }
00784 
00785 
00786 
00787 void FUEventProcessor::detachDqmFromShm() throw (evf::Exception)
00788 {
00789   std::string errmsg;
00790   bool success = false;
00791   try {
00792     edm::ServiceRegistry::Operate operate(evtProcessor_->getToken());
00793     if(edm::Service<FUShmDQMOutputService>().isAvailable())
00794       success = edm::Service<FUShmDQMOutputService>()->detachFromShm();
00795     if (!success) errmsg = "Failed to detach DQM service from shared memory";
00796   }
00797   catch (cms::Exception& e) {
00798     errmsg = "Failed to detach DQM service from shared memory: " + (std::string)e.what();
00799   }
00800   if (!errmsg.empty()) XCEPT_RAISE(evf::Exception,errmsg);
00801 }
00802 
00803 
00804 std::string FUEventProcessor::logsAsString()
00805 {
00806   std::ostringstream oss;
00807   if(logWrap_)
00808     {
00809       for(unsigned int i = logRingIndex_; i < logRing_.size(); i++)
00810         oss << logRing_[i] << std::endl;
00811       for(unsigned int i = 0; i <  logRingIndex_; i++)
00812         oss << logRing_[i] << std::endl;
00813     }
00814   else
00815       for(unsigned int i = logRingIndex_; i < logRing_.size(); i++)
00816         oss << logRing_[i] << std::endl;
00817     
00818   return oss.str();
00819 }
00820   
00821 void FUEventProcessor::localLog(std::string m)
00822 {
00823   timeval tv;
00824 
00825   gettimeofday(&tv,0);
00826   tm *uptm = localtime(&tv.tv_sec);
00827   char datestring[256];
00828   strftime(datestring, sizeof(datestring),"%c", uptm);
00829 
00830   if(logRingIndex_ == 0){logWrap_ = true; logRingIndex_ = logRingSize_;}
00831   logRingIndex_--;
00832   std::ostringstream timestamp;
00833   timestamp << " at " << datestring;
00834   m += timestamp.str();
00835   logRing_[logRingIndex_] = m;
00836 }
00837 
00838 void FUEventProcessor::startSupervisorLoop()
00839 {
00840   try {
00841     wlSupervising_=
00842       toolbox::task::getWorkLoopFactory()->getWorkLoop("Supervisor",
00843                                                        "waiting");
00844     if (!wlSupervising_->isActive()) wlSupervising_->activate();
00845     asSupervisor_ = toolbox::task::bind(this,&FUEventProcessor::supervisor,
00846                                         "Supervisor");
00847     wlSupervising_->submit(asSupervisor_);
00848     supervising_ = true;
00849   }
00850   catch (xcept::Exception& e) {
00851     std::string msg = "Failed to start workloop 'Supervisor'.";
00852     XCEPT_RETHROW(evf::Exception,msg,e);
00853   }
00854 }
00855 
00856 void FUEventProcessor::startReceivingLoop()
00857 {
00858   try {
00859     wlReceiving_=
00860       toolbox::task::getWorkLoopFactory()->getWorkLoop("Receiving",
00861                                                        "waiting");
00862     if (!wlReceiving_->isActive()) wlReceiving_->activate();
00863     asReceiveMsgAndExecute_ = toolbox::task::bind(this,&FUEventProcessor::receiving,
00864                                         "Receiving");
00865     wlReceiving_->submit(asReceiveMsgAndExecute_);
00866     receiving_ = true;
00867   }
00868   catch (xcept::Exception& e) {
00869     std::string msg = "Failed to start workloop 'Receiving'.";
00870     XCEPT_RETHROW(evf::Exception,msg,e);
00871   }
00872 }
00873 void FUEventProcessor::startReceivingMonitorLoop()
00874 {
00875   try {
00876     wlReceivingMonitor_=
00877       toolbox::task::getWorkLoopFactory()->getWorkLoop("ReceivingM",
00878                                                        "waiting");
00879     if (!wlReceivingMonitor_->isActive()) 
00880       wlReceivingMonitor_->activate();
00881     asReceiveMsgAndRead_ = 
00882       toolbox::task::bind(this,&FUEventProcessor::receivingAndMonitor,
00883                           "ReceivingM");
00884     wlReceivingMonitor_->submit(asReceiveMsgAndRead_);
00885     receivingM_ = true;
00886   }
00887   catch (xcept::Exception& e) {
00888     std::string msg = "Failed to start workloop 'ReceivingM'.";
00889     XCEPT_RETHROW(evf::Exception,msg,e);
00890   }
00891 }
00892 
00893 bool FUEventProcessor::receiving(toolbox::task::WorkLoop *)
00894 {
00895   MsgBuf msg;
00896   try{
00897     myProcess_->rcvSlave(msg,false); //will receive only messages from Master
00898     if(msg->mtype==MSQM_MESSAGE_TYPE_STOP)
00899       {
00900         pthread_mutex_lock(&stop_lock_);
00901         fsm_.fireEvent("Stop",this); // need to set state in fsm first to allow stopDone transition
00902         try{
00903           LOG4CPLUS_DEBUG(getApplicationLogger(),
00904                           "Trying to create message service presence ");
00905           edm::PresenceFactory *pf = edm::PresenceFactory::get();
00906           if(pf != 0) {
00907             pf->makePresence("MessageServicePresence").release();
00908           }
00909           else {
00910             LOG4CPLUS_ERROR(getApplicationLogger(),
00911                             "Unable to create message service presence ");
00912           }
00913         } 
00914         catch(...) {
00915           LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception");
00916         }
00917         stopClassic(); // call the normal sequence of stopping - as this is allowed to fail provisions must be made ...@@@EM
00918         MsgBuf msg1(0,MSQS_MESSAGE_TYPE_STOP);
00919         myProcess_->postSlave(msg1,false);
00920         pthread_mutex_unlock(&stop_lock_);
00921         fclose(stdout);
00922         fclose(stderr);
00923         exit(EXIT_SUCCESS);
00924       }
00925     if(msg->mtype==MSQM_MESSAGE_TYPE_FSTOP)
00926       _exit(EXIT_SUCCESS);
00927   }
00928   catch(evf::Exception &e){}
00929   return true;
00930 }
00931 
00932 bool FUEventProcessor::supervisor(toolbox::task::WorkLoop *)
00933 {
00934   pthread_mutex_lock(&stop_lock_);
00935   if(subs_.size()!=nbSubProcesses_.value_)
00936     {
00937       subs_.resize(nbSubProcesses_.value_);
00938       spMStates_.resize(nbSubProcesses_.value_);
00939       spmStates_.resize(nbSubProcesses_.value_);
00940       for(unsigned int i = 0; i < spMStates_.size(); i++)
00941         {
00942           spMStates_[i] = edm::event_processor::sInit; 
00943           spmStates_[i] = 0; 
00944         }
00945     }
00946   bool running = fsm_.stateName()->toString()=="Enabled";
00947   bool stopping = fsm_.stateName()->toString()=="stopping";
00948   for(unsigned int i = 0; i < subs_.size(); i++)
00949     {
00950       if(subs_[i].alive()==-1000) continue;
00951       int sl;
00952 
00953       pid_t killedOrNot = waitpid(subs_[i].pid(),&sl,WNOHANG);
00954 
00955       if(killedOrNot==subs_[i].pid()) subs_[i].setStatus((WIFEXITED(sl) != 0 ? 0 : -1));
00956       else continue;
00957       pthread_mutex_lock(&pickup_lock_);
00958       std::ostringstream ost;
00959       if(subs_[i].alive()==0) ost << " process exited with status " << WEXITSTATUS(sl);
00960       else if(WIFSIGNALED(sl)!=0) ost << " process terminated with signal " << WTERMSIG(sl);
00961       else ost << " process stopped ";
00962       subs_[i].countdown()=slaveRestartDelaySecs_.value_;
00963       subs_[i].setReasonForFailed(ost.str());
00964       spMStates_[i] = evtProcessor_.notstarted_state_code();
00965       spmStates_[i] = 0;
00966       std::ostringstream ost1;
00967       ost1 << "-E- Slave " << subs_[i].pid() << ost.str();
00968       localLog(ost1.str());
00969       if(!autoRestartSlaves_.value_) subs_[i].disconnect();
00970       pthread_mutex_unlock(&pickup_lock_);
00971     }
00972   pthread_mutex_unlock(&stop_lock_);    
00973   if(stopping) return true; // if in stopping we are done
00974 
00975   if(running)
00976     {
00977       // if enabled, this loop will periodically check if dead slaves countdown has expired and restart them
00978       // this is only active while running, hence, the stop lock is acquired and only released at end of loop
00979       if(autoRestartSlaves_.value_){
00980         pthread_mutex_lock(&stop_lock_); //lockout slave killing at stop while you check for restarts
00981         for(unsigned int i = 0; i < subs_.size(); i++)
00982           {
00983             if(subs_[i].alive() != 1){
00984               if(subs_[i].countdown() == 0)
00985                 {
00986                   if(subs_[i].restartCount()>2){
00987                     LOG4CPLUS_WARN(getApplicationLogger()," Not restarting subprocess in slot " << i 
00988                                    << " - maximum restart count reached");
00989                     std::ostringstream ost1;
00990                     ost1 << "-W- Dead Process in slot " << i << " reached maximum restart count"; 
00991                     localLog(ost1.str());
00992                     subs_[i].countdown()--;
00993                     XCEPT_DECLARE(evf::Exception,
00994                                   sentinelException, ost1.str());
00995                     notifyQualified("error",sentinelException);
00996                     subs_[i].disconnect();
00997                     continue;
00998                   }
00999                   subs_[i].restartCount()++;
01000                   pid_t rr = subs_[i].forkNew();
01001                   if(rr==0)
01002                     {
01003                       myProcess_=&subs_[i];
01004                       scalersUpdates_ = 0;
01005                       int retval = pthread_mutex_destroy(&stop_lock_);
01006                       if(retval != 0) perror("error");
01007                       retval = pthread_mutex_init(&stop_lock_,0);
01008                       if(retval != 0) perror("error");
01009                       fsm_.disableRcmsStateNotification();
01010                       fsm_.fireEvent("Stop",this); // need to set state in fsm first to allow stopDone transition
01011                       fsm_.fireEvent("StopDone",this); // need to set state in fsm first to allow stopDone transition
01012                       fsm_.fireEvent("Enable",this); // need to set state in fsm first to allow stopDone transition
01013                       try{
01014                         xdata::Serializable *lsid = applicationInfoSpace_->find("lumiSectionIndex");
01015                         if(lsid) {
01016                           ((xdata::UnsignedInteger32*)(lsid))->value_--; // need to reset to value before end of ls in which process died
01017                         }
01018                       }
01019                       catch(...){
01020                         std::cout << "trouble with lsindex during restart" << std::endl;
01021                       }
01022                       try{
01023                         xdata::Serializable *lstb = applicationInfoSpace_->find("lsToBeRecovered");
01024                         if(lstb) {
01025                           ((xdata::Boolean*)(lstb))->value_ = false; // do not issue eol/bol for all Ls when restarting
01026                         }
01027                       }
01028                       catch(...){
01029                         std::cout << "trouble with resetting flag for eol recovery " << std::endl;
01030                       }
01031 
01032                       evtProcessor_.adjustLsIndexForRestart();
01033                       evtProcessor_.resetPackedTriggerReport();
01034                       enableMPEPSlave();
01035                       return false; // exit the supervisor loop immediately in the child !!!
01036                     }
01037                   else
01038                     {
01039                       std::ostringstream ost1;
01040                       ost1 << "-I- New Process " << rr << " forked for slot " << i; 
01041                       localLog(ost1.str());
01042                     }
01043                 }
01044               if(subs_[i].countdown()>=0) subs_[i].countdown()--;
01045             }
01046           }
01047         pthread_mutex_unlock(&stop_lock_);
01048       } // finished handling replacement of dead slaves once they've been reaped
01049     }
01050   xdata::Serializable *lsid = 0; 
01051   xdata::Serializable *psid = 0;
01052   xdata::Serializable *epMAltState = 0; 
01053   xdata::Serializable *epmAltState = 0;
01054   xdata::Serializable *dqmp = 0;
01055   xdata::UnsignedInteger32 *dqm = 0;
01056 
01057 
01058   
01059   if(running){  
01060     try{
01061       lsid = applicationInfoSpace_->find("lumiSectionIndex");
01062       psid = applicationInfoSpace_->find("prescaleSetIndex");
01063       nbProcessed = monitorInfoSpace_->find("nbProcessed");
01064       nbAccepted  = monitorInfoSpace_->find("nbAccepted");
01065       epMAltState = monitorInfoSpace_->find("epSPMacroStateInt");
01066       epmAltState = monitorInfoSpace_->find("epSPMicroStateInt");
01067       dqmp = applicationInfoSpace_-> find("nbDqmUpdates");      
01068     }
01069     catch(xdata::exception::Exception e){
01070       LOG4CPLUS_INFO(getApplicationLogger(),"could not retrieve some data - " << e.what());    
01071     }
01072 
01073     try{
01074       if(nbProcessed !=0 && nbAccepted !=0)
01075         {
01076           xdata::UnsignedInteger32*nbp = ((xdata::UnsignedInteger32*)nbProcessed);
01077           xdata::UnsignedInteger32*nba = ((xdata::UnsignedInteger32*)nbAccepted);
01078           xdata::UnsignedInteger32*ls  = ((xdata::UnsignedInteger32*)lsid);
01079           xdata::UnsignedInteger32*ps  = ((xdata::UnsignedInteger32*)psid);
01080           if(dqmp!=0)
01081             dqm = (xdata::UnsignedInteger32*)dqmp;
01082           if(dqm) dqm->value_ = 0;
01083           nbTotalDQM_ = 0;
01084           nbp->value_ = 0;
01085           nba->value_ = 0;
01086           nblive_ = 0;
01087           nbdead_ = 0;
01088           scalersUpdates_ = 0;
01089 
01090           for(unsigned int i = 0; i < subs_.size(); i++)
01091             {
01092               if(subs_[i].alive()>0)
01093                 {
01094                   nblive_++;
01095                   try{
01096                     subs_[i].post(master_message_prg_,true);
01097                     
01098                     unsigned long retval = subs_[i].rcvNonBlocking(master_message_prr_,true);
01099                     if(retval == (unsigned long) master_message_prr_->mtype){
01100                       prg* p = (struct prg*)(master_message_prr_->mtext);
01101                       subs_[i].setParams(p);
01102                       spMStates_[i] = p->Ms;
01103                       spmStates_[i] = p->ms;
01104                       cpustat_->addEntry(p->ms);
01105                       if(!subs_[i].inInconsistentState() && 
01106                          (p->Ms == edm::event_processor::sError 
01107                           || p->Ms == edm::event_processor::sInvalid
01108                           || p->Ms == edm::event_processor::sStopping))
01109                         {
01110                           std::ostringstream ost;
01111                           ost << "edm::eventprocessor slot " << i << " process id " 
01112                               << subs_[i].pid() << " not in Running state : Mstate=" 
01113                               << evtProcessor_.stateNameFromIndex(p->Ms) << " mstate="
01114                               << evtProcessor_.moduleNameFromIndex(p->ms) 
01115                               << " - Look into possible error messages from HLT process";
01116                           LOG4CPLUS_WARN(getApplicationLogger(),ost.str());
01117                         }
01118                       nbp->value_ += subs_[i].params().nbp;
01119                       nba->value_  += subs_[i].params().nba;
01120                       if(dqm)dqm->value_ += p->dqm;
01121                       nbTotalDQM_ +=  p->dqm;
01122                       scalersUpdates_ += p->trp;
01123                       if(p->ls > ls->value_) ls->value_ = p->ls;
01124                       if(p->ps != ps->value_) ps->value_ = p->ps;
01125                     }
01126                     else{
01127                       nbp->value_ += subs_[i].get_save_nbp();
01128                       nba->value_ += subs_[i].get_save_nba();
01129                     }
01130                   } 
01131                   catch(evf::Exception &e){
01132                     LOG4CPLUS_INFO(getApplicationLogger(),
01133                                    "could not send/receive msg on slot " 
01134                                    << i << " - " << e.what());    
01135                   }
01136                     
01137                 }
01138               else
01139                 {
01140                   nbp->value_ += subs_[i].get_save_nbp();
01141                   nba->value_ += subs_[i].get_save_nba();
01142                   nbdead_++;
01143                 }
01144             }
01145           if(nbp->value_>64){//have some slaves already processed more than one event ? (eventually make this == number of raw cells)
01146             for(unsigned int i = 0; i < subs_.size(); i++)
01147               {
01148                 if(subs_[i].params().nbp == 0){ // a slave has processed 0 events 
01149                   // check that the process is not stuck
01150                   if(subs_[i].alive()>0 && subs_[i].params().ms == 0) // the process is seen alive but in us=Invalid(0)
01151                     {
01152                       subs_[i].found_invalid();//increase the "found_invalid" counter
01153                       if(subs_[i].nfound_invalid() > 60){ //wait x monitor cycles (~1 min a good time ?) before doing something about a stuck slave
01154                         MsgBuf msg3(NUMERIC_MESSAGE_SIZE,MSQM_MESSAGE_TYPE_FSTOP);      // send a force-stop signal             
01155                         subs_[i].post(msg3,false);
01156                         std::ostringstream ost1;
01157                         ost1 << "-W- Process in slot " << i << " Never reached the running state - forcestopping it"; 
01158                         localLog(ost1.str());
01159                         LOG4CPLUS_ERROR(getApplicationLogger(),ost1.str());    
01160                         XCEPT_DECLARE(evf::Exception,
01161                                       sentinelException, ost1.str());
01162                         notifyQualified("error",sentinelException);
01163 
01164                       }
01165                     }
01166                 }
01167               }
01168           }
01169         }
01170     }
01171     catch(std::exception &e){
01172       LOG4CPLUS_INFO(getApplicationLogger(),"std exception - " << e.what());    
01173     }
01174     catch(...){
01175       LOG4CPLUS_INFO(getApplicationLogger(),"unknown exception ");    
01176     }
01177   }
01178   else{
01179     for(unsigned int i = 0; i < subs_.size(); i++)
01180       {
01181         if(subs_[i].alive()==-1000)
01182           {
01183             spMStates_[i] = edm::event_processor::sInit;
01184             spmStates_[i] = 0;
01185           }
01186       }
01187   }
01188   try{
01189     monitorInfoSpace_->lock();
01190     monitorInfoSpace_->fireItemGroupChanged(names_,0);
01191     monitorInfoSpace_->unlock();
01192   }
01193   catch(xdata::exception::Exception &e)
01194     {
01195       LOG4CPLUS_ERROR(log_, "Exception from fireItemGroupChanged: " << e.what());
01196       //        localLog(e.what());
01197     }
01198   ::sleep(superSleepSec_.value_);       
01199   return true;
01200 }
01201 
01202 void FUEventProcessor::startScalersWorkLoop() throw (evf::Exception)
01203 {
01204   try {
01205     wlScalers_=
01206       toolbox::task::getWorkLoopFactory()->getWorkLoop("Scalers",
01207                                                        "waiting");
01208     if (!wlScalers_->isActive()) wlScalers_->activate();
01209     asScalers_ = toolbox::task::bind(this,&FUEventProcessor::scalers,
01210                                      "Scalers");
01211     
01212   wlScalers_->submit(asScalers_);
01213   wlScalersActive_ = true;
01214   }
01215   catch (xcept::Exception& e) {
01216     std::string msg = "Failed to start workloop 'Scalers'.";
01217     XCEPT_RETHROW(evf::Exception,msg,e);
01218   }
01219 }
01220 
01221 //______________________________________________________________________________
01222 
01223 void FUEventProcessor::startSummarizeWorkLoop() throw (evf::Exception)
01224 {
01225   try {
01226     wlSummarize_=
01227       toolbox::task::getWorkLoopFactory()->getWorkLoop("Summary",
01228                                                        "waiting");
01229     if (!wlSummarize_->isActive()) wlSummarize_->activate();
01230     
01231     asSummarize_ = toolbox::task::bind(this,&FUEventProcessor::summarize,
01232                                        "Summary");
01233 
01234     wlSummarize_->submit(asSummarize_);
01235     wlSummarizeActive_ = true;
01236   }
01237   catch (xcept::Exception& e) {
01238     std::string msg = "Failed to start workloop 'Summarize'.";
01239     XCEPT_RETHROW(evf::Exception,msg,e);
01240   }
01241 }
01242 
01243 //______________________________________________________________________________
01244 
01245 bool FUEventProcessor::scalers(toolbox::task::WorkLoop* wl)
01246 {
01247   if(evtProcessor_)
01248     {
01249       if(!evtProcessor_.getTriggerReport(true)) {
01250         wlScalersActive_ = false;
01251         return false;
01252       }
01253     }
01254   else
01255     {
01256       std::cout << getpid()<< " Scalers workloop, bailing out, no evtProcessor " << std::endl;
01257       wlScalersActive_ = false;
01258       return false;
01259     }
01260   if(myProcess_) 
01261     {
01262       //      std::cout << getpid() << "going to post on control queue from scalers" << std::endl;
01263       int ret = myProcess_->postSlave(evtProcessor_.getPackedTriggerReport(),false);
01264       if(ret!=0)      std::cout << "scalers workloop, error posting to sqs_ " << errno << std::endl;
01265       scalersUpdates_++;
01266     }
01267   else
01268     evtProcessor_.fireScalersUpdate();
01269   return true;
01270 }
01271 
01272 //______________________________________________________________________________
01273 bool FUEventProcessor::summarize(toolbox::task::WorkLoop* wl)
01274 {
01275   evtProcessor_.resetPackedTriggerReport();
01276   bool atLeastOneProcessUpdatedSuccessfully = false;
01277   int msgCount = 0;
01278   for (unsigned int i = 0; i < subs_.size(); i++)
01279     {
01280       if(subs_[i].alive()>0)
01281         {
01282           int ret = 0;
01283           if(subs_[i].check_postponed_trigger_update(master_message_trr_,
01284                                                      evtProcessor_.getLumiSectionReferenceIndex()))
01285             {
01286               ret = MSQS_MESSAGE_TYPE_TRR;
01287               std::cout << "using postponed report from slot " << i << " for ls " << evtProcessor_.getLumiSectionReferenceIndex() << std::endl;
01288             }
01289           else{
01290             bool insync = false;
01291             bool exception_caught = false;
01292             while(!insync){
01293               try{
01294                 ret = subs_[i].rcv(master_message_trr_,false);
01295               }
01296               catch(evf::Exception &e)
01297                 {
01298                   std::cout << "exception in msgrcv on " << i 
01299                             << " " << subs_[i].alive() << " " << strerror(errno) << std::endl;
01300                   exception_caught = true;
01301                   break;
01302                   //do nothing special
01303                 }
01304               if(ret==MSQS_MESSAGE_TYPE_TRR) {
01305                 TriggerReportStatic *trp = (TriggerReportStatic *)master_message_trr_->mtext;
01306                 if(trp->lumiSection >= evtProcessor_.getLumiSectionReferenceIndex()){
01307                   insync = true;
01308                 }
01309               }
01310             }
01311             if(exception_caught) continue;
01312           }
01313           msgCount++;
01314           if(ret==MSQS_MESSAGE_TYPE_TRR) {
01315             TriggerReportStatic *trp = (TriggerReportStatic *)master_message_trr_->mtext;
01316             if(trp->lumiSection > evtProcessor_.getLumiSectionReferenceIndex()){
01317               std::cout << "postpone handling of msg from slot " << i << " with Ls " <<  trp->lumiSection
01318                         << " should be " << evtProcessor_.getLumiSectionReferenceIndex() << std::endl;
01319               subs_[i].add_postponed_trigger_update(master_message_trr_);
01320             }else{
01321               atLeastOneProcessUpdatedSuccessfully = true;
01322               evtProcessor_.sumAndPackTriggerReport(master_message_trr_);
01323             }
01324           }
01325           else std::cout << "msgrcv returned error " << errno << std::endl;
01326         }
01327     }
01328   if(atLeastOneProcessUpdatedSuccessfully){
01329     nbSubProcessesReporting_.value_ = msgCount;
01330     evtProcessor_.updateRollingReport();
01331     evtProcessor_.fireScalersUpdate();
01332   }
01333   else{
01334     LOG4CPLUS_WARN(getApplicationLogger(),"Summarize loop: no process updated successfully - sleep 10 seconds before trying again");          
01335     if(msgCount==0) evtProcessor_.withdrawLumiSectionIncrement();
01336     nbSubProcessesReporting_.value_ = 0;
01337     ::sleep(10);
01338   }
01339   if(fsm_.stateName()->toString()!="Enabled"){
01340     wlScalersActive_ = false;
01341     return false;
01342   }
01343   //  cpustat_->printStat();
01344   if(iDieStatisticsGathering_.value_){
01345     try{
01346       cpustat_ ->sendStat(evtProcessor_.getLumiSectionReferenceIndex());
01347       ratestat_->sendStat((unsigned char*)(evtProcessor_.getPackedTriggerReportAsStruct()),
01348                           sizeof(TriggerReportStatic),
01349                           evtProcessor_.getLumiSectionReferenceIndex());
01350     }catch(evf::Exception &e){
01351       LOG4CPLUS_INFO(getApplicationLogger(),"coud not send statistics"
01352                      << e.what());
01353     }
01354   }
01355   cpustat_->reset();
01356   return true;
01357 }
01358 
01359 
01360 
01361 bool FUEventProcessor::receivingAndMonitor(toolbox::task::WorkLoop *)
01362 {
01363   try{
01364     myProcess_->rcvSlave(slave_message_monitoring_,true); //will receive only messages from Master
01365     switch(slave_message_monitoring_->mtype)
01366       {
01367       case MSQM_MESSAGE_TYPE_MCS:
01368         {
01369           xgi::Input *in = 0;
01370           xgi::Output out;
01371           evtProcessor_.microState(in,&out);
01372           MsgBuf msg1(out.str().size(),MSQS_MESSAGE_TYPE_MCR);
01373           strncpy(msg1->mtext,out.str().c_str(),out.str().size());
01374           myProcess_->postSlave(msg1,true);
01375           break;
01376         }
01377       
01378       case MSQM_MESSAGE_TYPE_PRG:
01379         {
01380           xdata::Serializable *dqmp = 0;
01381           xdata::UnsignedInteger32 *dqm = 0;
01382           evtProcessor_.monitoring(0);
01383           try{
01384             dqmp = applicationInfoSpace_-> find("nbDqmUpdates");
01385           }  catch(xdata::exception::Exception e){}
01386           if(dqmp!=0)
01387             dqm = (xdata::UnsignedInteger32*)dqmp;
01388 
01389           //      monitorInfoSpace_->lock();  
01390           prg * data           = (prg*)slave_message_prr_->mtext;
01391           data->ls             = evtProcessor_.lsid_;
01392           data->eols           = evtProcessor_.lastLumiUsingEol_;
01393           data->ps             = evtProcessor_.psid_;
01394           data->nbp            = evtProcessor_->totalEvents();
01395           data->nba            = evtProcessor_->totalEventsPassed();
01396           data->Ms             = evtProcessor_.epMAltState_.value_;
01397           data->ms             = evtProcessor_.epmAltState_.value_;
01398           if(dqm) data->dqm    = dqm->value_; else data->dqm = 0;
01399           data->trp            = scalersUpdates_;
01400           //      monitorInfoSpace_->unlock();  
01401           myProcess_->postSlave(slave_message_prr_,true);
01402           if(exitOnError_.value_)
01403           { 
01404             // after each monitoring cycle check if we are in inconsistent state and exit if configured to do so  
01405             //      std::cout << getpid() << "receivingAndMonitor: trying to acquire stop lock " << std::endl;
01406             if(data->Ms == edm::event_processor::sStopping || data->Ms == edm::event_processor::sError) 
01407               { 
01408                 bool running = true;
01409                 int count = 0;
01410                 while(running){
01411                   int retval = pthread_mutex_lock(&stop_lock_);
01412                   if(retval != 0) perror("error");
01413                   running = fsm_.stateName()->toString()=="Enabled";
01414                   if(count>5) _exit(-1);
01415                   pthread_mutex_unlock(&stop_lock_);
01416                   if(running) {::sleep(1); count++;}
01417                 }
01418               }
01419             
01420           }
01421           //      scalersUpdates_++;
01422           break;
01423         }
01424       case MSQM_MESSAGE_TYPE_WEB:
01425         {
01426           xgi::Input  *in = 0;
01427           xgi::Output out;
01428           unsigned int bytesToSend = 0;
01429           MsgBuf msg1(NUMERIC_MESSAGE_SIZE,MSQS_MESSAGE_TYPE_WEB);
01430           std::string query = slave_message_monitoring_->mtext;
01431           size_t pos = query.find_first_of("&");
01432           std::string method;
01433           std::string args;
01434           if(pos!=std::string::npos)  
01435             {
01436               method = query.substr(0,pos);
01437               args = query.substr(pos+1,query.length()-pos-1);
01438             }
01439           else
01440             method=query;
01441 
01442           if(method=="Spotlight")
01443             {
01444               spotlightWebPage(in,&out);
01445             }
01446           else if(method=="procStat")
01447             {
01448               procStat(in,&out);
01449             }
01450           else if(method=="moduleWeb")
01451             {
01452               internal::MyCgi mycgi;
01453               boost::char_separator<char> sep(";");
01454               boost::tokenizer<boost::char_separator<char> > tokens(args, sep);
01455               for (boost::tokenizer<boost::char_separator<char> >::iterator tok_iter = tokens.begin();
01456                    tok_iter != tokens.end(); ++tok_iter){
01457                 size_t pos = (*tok_iter).find_first_of("%");
01458                 if(pos != std::string::npos){
01459                   std::string first  = (*tok_iter).substr(0    ,                        pos);
01460                   std::string second = (*tok_iter).substr(pos+1, (*tok_iter).length()-pos-1);
01461                   mycgi.getEnvironment()[first]=second;
01462                 }
01463               }
01464               moduleWeb(&mycgi,&out);
01465             }
01466           else if(method=="Default")
01467             {
01468               defaultWebPage(in,&out);
01469             }
01470           else 
01471             {
01472               out << "Error 404!!!!!!!!" << std::endl;
01473             }
01474 
01475 
01476           bytesToSend = out.str().size();
01477           unsigned int cycle = 0;
01478           if(bytesToSend==0)
01479             {
01480               snprintf(msg1->mtext, NUMERIC_MESSAGE_SIZE, "%d", bytesToSend);
01481               myProcess_->postSlave(msg1,true);
01482             }
01483           while(bytesToSend !=0){
01484             unsigned int msgSize = bytesToSend>MAX_PIPE_BUFFER_SIZE ? MAX_PIPE_BUFFER_SIZE : bytesToSend;
01485             snprintf(msg1->mtext, NUMERIC_MESSAGE_SIZE, "%d", msgSize);
01486             write(anonymousPipe_[PIPE_WRITE],
01487                   out.str().c_str()+MAX_PIPE_BUFFER_SIZE*cycle,
01488                   msgSize);
01489             myProcess_->postSlave(msg1,true);
01490             bytesToSend -= msgSize;
01491             cycle++;
01492           }
01493           break;
01494         }
01495       case MSQM_MESSAGE_TYPE_TRP:
01496         {
01497           break;
01498         }
01499       }
01500   }
01501   catch(evf::Exception &e){std::cout << "exception caught in recevingM: " << e.what() << std::endl;}
01502   return true;
01503 }
01504 
01505 bool FUEventProcessor::enableCommon()
01506 {
01507   try {    
01508     if(hasShMem_) attachDqmToShm();
01509     int sc = 0;
01510     evtProcessor_->clearCounters();
01511     if(isRunNumberSetter_)
01512       evtProcessor_->setRunNumber(runNumber_.value_);
01513     else
01514       evtProcessor_->declareRunNumber(runNumber_.value_);
01515     try{
01516       ::sleep(1);
01517       evtProcessor_->runAsync();
01518       sc = evtProcessor_->statusAsync();
01519     }
01520     catch(cms::Exception &e) {
01521       reasonForFailedState_ = e.explainSelf();
01522       fsm_.fireFailed(reasonForFailedState_,this);
01523       localLog(reasonForFailedState_);
01524       return false;
01525     }    
01526     catch(std::exception &e) {
01527       reasonForFailedState_  = e.what();
01528       fsm_.fireFailed(reasonForFailedState_,this);
01529       localLog(reasonForFailedState_);
01530       return false;
01531     }
01532     catch(...) {
01533       reasonForFailedState_ = "Unknown Exception";
01534       fsm_.fireFailed(reasonForFailedState_,this);
01535       localLog(reasonForFailedState_);
01536       return false;
01537     }
01538     if(sc != 0) {
01539       std::ostringstream oss;
01540       oss<<"EventProcessor::runAsync returned status code " << sc;
01541       reasonForFailedState_ = oss.str();
01542       fsm_.fireFailed(reasonForFailedState_,this);
01543       localLog(reasonForFailedState_);
01544       return false;
01545     }
01546   }
01547   catch (xcept::Exception &e) {
01548     reasonForFailedState_ = "enabling FAILED: " + (std::string)e.what();
01549     fsm_.fireFailed(reasonForFailedState_,this);
01550     localLog(reasonForFailedState_);
01551     return false;
01552   }
01553   try{
01554     fsm_.fireEvent("EnableDone",this);
01555   }
01556   catch (xcept::Exception &e) {
01557     std::cout << "exception " << (std::string)e.what() << std::endl;
01558     throw;
01559   }
01560 
01561   return false;
01562 }
01563   
01564 bool FUEventProcessor::enableClassic()
01565 {
01566   bool retval = enableCommon();
01567   while(evtProcessor_->getState()!= edm::event_processor::sRunning){
01568     LOG4CPLUS_INFO(getApplicationLogger(),"waiting for edm::EventProcessor to start before enabling watchdog");
01569     ::sleep(1);
01570   }
01571   
01572   //  implementation moved to EPWrapper
01573   //  startScalersWorkLoop(); // this is now not done any longer 
01574   localLog("-I- Start completed");
01575   return retval;
01576 }
01577 bool FUEventProcessor::enableMPEPSlave()
01578 {
01579   //all this happens only in the child process
01580   startReceivingLoop();
01581   startReceivingMonitorLoop();
01582   evtProcessor_.resetWaiting();
01583   startScalersWorkLoop();
01584   while(!evtProcessor_.isWaitingForLs())
01585     ::sleep(1);
01586   // @EM test do not run monitor loop in slave, only receiving&Monitor
01587   //  evtProcessor_.startMonitoringWorkLoop();
01588   try{
01589     //    evtProcessor_.makeServicesOnly();
01590     try{
01591       LOG4CPLUS_DEBUG(getApplicationLogger(),
01592                       "Trying to create message service presence ");
01593       edm::PresenceFactory *pf = edm::PresenceFactory::get();
01594       if(pf != 0) {
01595         pf->makePresence("MessageServicePresence").release();
01596       }
01597       else {
01598         LOG4CPLUS_ERROR(getApplicationLogger(),
01599                         "Unable to create message service presence ");
01600       }
01601     } 
01602     catch(...) {
01603       LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception");
01604     }
01605   ML::MLlog4cplus::setAppl(this);      
01606   }       
01607   catch (xcept::Exception &e) {
01608     reasonForFailedState_ = "enabling FAILED: " + (std::string)e.what();
01609     fsm_.fireFailed(reasonForFailedState_,this);
01610     localLog(reasonForFailedState_);
01611   }
01612   bool retval =  enableCommon();
01613   //  while(evtProcessor_->getState()!= edm::event_processor::sRunning){
01614   //    LOG4CPLUS_INFO(getApplicationLogger(),"waiting for edm::EventProcessor to start before enabling watchdog");
01615   //    ::sleep(1);
01616   //  }
01617   return retval;
01618 }
01619 
01620 bool FUEventProcessor::stopClassic()
01621 {
01622   try {
01623     LOG4CPLUS_INFO(getApplicationLogger(),"Start stopping :) ...");
01624     edm::EventProcessor::StatusCode rc = evtProcessor_.stop();
01625     if(rc == edm::EventProcessor::epSuccess) 
01626       fsm_.fireEvent("StopDone",this);
01627     else
01628       {
01629         //      epMState_ = evtProcessor_->currentStateName();
01630         if(rc == edm::EventProcessor::epTimedOut)
01631           reasonForFailedState_ = "EventProcessor stop timed out";
01632         else
01633           reasonForFailedState_ = "EventProcessor did not receive STOP event";
01634         fsm_.fireFailed(reasonForFailedState_,this);
01635         localLog(reasonForFailedState_);
01636       }
01637     if(hasShMem_) detachDqmFromShm();
01638   }
01639   catch (xcept::Exception &e) {
01640     reasonForFailedState_ = "stopping FAILED: " + (std::string)e.what();
01641     localLog(reasonForFailedState_);
01642     fsm_.fireFailed(reasonForFailedState_,this);
01643   }
01644   LOG4CPLUS_INFO(getApplicationLogger(),"Finished stopping!");
01645   localLog("-I- Stop completed");
01646   return false;
01647 }
01648 
01649 void FUEventProcessor::stopSlavesAndAcknowledge()
01650 {
01651   MsgBuf msg(0,MSQM_MESSAGE_TYPE_STOP);
01652   MsgBuf msg1(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_STOP);
01653 
01654   std::vector<bool> processes_to_stop(nbSubProcesses_.value_,false);
01655   for(unsigned int i = 0; i < subs_.size(); i++)
01656     {
01657       pthread_mutex_lock(&stop_lock_);
01658       if(subs_[i].alive()>0){
01659         processes_to_stop[i] = true;
01660         subs_[i].post(msg,false);
01661       }
01662       pthread_mutex_unlock(&stop_lock_);
01663     }
01664   for(unsigned int i = 0; i < subs_.size(); i++)
01665     {
01666       pthread_mutex_lock(&stop_lock_);
01667       if(processes_to_stop[i]){
01668         try{
01669           subs_[i].rcv(msg1,false);
01670         }
01671         catch(evf::Exception &e){
01672           std::ostringstream ost;
01673           ost << "failed to get STOP - errno ->" << errno << " " << e.what(); 
01674           reasonForFailedState_ = ost.str();
01675           LOG4CPLUS_ERROR(getApplicationLogger(),reasonForFailedState_);
01676           //      fsm_.fireFailed(reasonForFailedState_,this);
01677           localLog(reasonForFailedState_);
01678           break;
01679         }
01680       }
01681       else {
01682         pthread_mutex_unlock(&stop_lock_);
01683         continue;
01684       }
01685       pthread_mutex_unlock(&stop_lock_);
01686       if(msg1->mtype==MSQS_MESSAGE_TYPE_STOP)
01687         while(subs_[i].alive()>0) ::usleep(10000);
01688       subs_[i].disconnect();
01689     }
01690   //  subs_.clear();
01691 
01692 }
01693 
01694 void FUEventProcessor::microState(xgi::Input *in,xgi::Output *out)
01695 {
01696   std::string urn = getApplicationDescriptor()->getURN();
01697   try{
01698     evtProcessor_.stateNameFromIndex(0);
01699     evtProcessor_.moduleNameFromIndex(0);
01700   if(myProcess_) {std::cout << "microstate called for child! bail out" << std::endl; return;}
01701   *out << "<tr><td>" << fsm_.stateName()->toString() 
01702        << "</td><td>"<< (myProcess_ ? "S" : "M") <<"</td><td>" << nblive_ << "</td><td>"
01703        << nbdead_ << "</td><td><a href=\"/" << urn << "/procStat\">" << getpid() <<"</a></td>";
01704   evtProcessor_.microState(in,out);
01705   *out << "<td></td><td>" << nbTotalDQM_ 
01706        << "</td><td>" << evtProcessor_.getScalersUpdates() << "</td></tr>";
01707   if(nbSubProcesses_.value_!=0 && !myProcess_) 
01708     {
01709       pthread_mutex_lock(&start_lock_);
01710       for(unsigned int i = 0; i < subs_.size(); i++)
01711         {
01712           try{
01713             if(subs_[i].alive()>0)
01714               {
01715                 *out << "<tr><td  bgcolor=\"#00FF00\" id=\"a"
01716                      << i << "\">""Alive</td><td>S</td><td>"
01717                      << subs_[i].queueId() << "<td>" 
01718                      << subs_[i].queueStatus()<< "/"
01719                      << subs_[i].queueOccupancy() << "/"
01720                      << subs_[i].queuePidOfLastSend() << "/"
01721                      << subs_[i].queuePidOfLastReceive() 
01722                      << "</td><td><a id=\"p"<< i << "\" href=\"SubWeb?process=" 
01723                      << subs_[i].pid() << "&method=procStat\">" 
01724                      << subs_[i].pid()<<"</a></td>" //<< msg->mtext;
01725                      << "<td>" << evtProcessor_.stateNameFromIndex(subs_[i].params().Ms) << "</td><td>" 
01726                      << evtProcessor_.moduleNameFromIndex(subs_[i].params().ms) << "</td><td>" 
01727                      << subs_[i].params().nba << "/" << subs_[i].params().nbp 
01728                      << " (" << float(subs_[i].params().nba)/float(subs_[i].params().nbp)*100. <<"%)" 
01729                      << "</td><td>" << subs_[i].params().ls  << "/" << subs_[i].params().ls 
01730                      << "</td><td>" << subs_[i].params().ps 
01731                      << "</td><td" 
01732                      << ((subs_[i].params().eols<subs_[i].params().ls) ? " bgcolor=\"#00FF00\"" : " bgcolor=\"#FF0000\"")  << ">" 
01733                      << subs_[i].params().eols  
01734                      << "</td><td>" << subs_[i].params().dqm 
01735                      << "</td><td>" << subs_[i].params().trp << "</td>";
01736               }
01737             else 
01738               {
01739                 pthread_mutex_lock(&pickup_lock_);
01740                 *out << "<tr><td id=\"a"<< i << "\" ";
01741                 if(subs_[i].alive()==-1000)
01742                   *out << " bgcolor=\"#bbaabb\">NotInitialized";
01743                 else
01744                   *out << (subs_[i].alive()==0 ? ">Done" : " bgcolor=\"#FF0000\">Dead");
01745                 *out << "</td><td>S</td><td>"<< subs_[i].queueId() << "<td>" 
01746                      << subs_[i].queueStatus() << "/"
01747                      << subs_[i].queueOccupancy() << "/"
01748                      << subs_[i].queuePidOfLastSend() << "/"
01749                      << subs_[i].queuePidOfLastReceive() 
01750                      << "</td><td id=\"p"<< i << "\">"
01751                      <<subs_[i].pid()<<"</td><td colspan=\"5\">" << subs_[i].reasonForFailed();
01752                 if(subs_[i].alive()!=0 && subs_[i].alive()!=-1000) 
01753                   {
01754                     if(autoRestartSlaves_ && subs_[i].restartCount()<=2) 
01755                       *out << " will restart in " << subs_[i].countdown() << " s";
01756                     else if(autoRestartSlaves_)
01757                       *out << " reached maximum restart count";
01758                     else *out << " autoRestart is disabled ";
01759                   }
01760                 *out << "</td><td" 
01761                      << ((subs_[i].params().eols<subs_[i].params().ls) ? 
01762                          " bgcolor=\"#00FF00\"" : " bgcolor=\"#FF0000\"")  
01763                      << ">" 
01764                      << subs_[i].params().eols  
01765                      << "</td><td>" << subs_[i].params().dqm 
01766                      << "</td><td>" << subs_[i].params().trp << "</td>";
01767                 pthread_mutex_unlock(&pickup_lock_);
01768               }
01769             *out << "</tr>";
01770           }
01771           catch(evf::Exception &e){
01772             *out << "<tr><td id=\"a"<< i << "\" " 
01773                  <<"bgcolor=\"#FFFF00\">NotResp</td><td>S</td><td>"<< subs_[i].queueId() << "<td>" 
01774                  << subs_[i].queueStatus() << "/"
01775                  << subs_[i].queueOccupancy() << "/"
01776                  << subs_[i].queuePidOfLastSend() << "/"
01777                  << subs_[i].queuePidOfLastReceive() 
01778                  << "</td><td id=\"p"<< i << "\">"
01779                  <<subs_[i].pid()<<"</td>";
01780           }
01781         }
01782       pthread_mutex_unlock(&start_lock_); 
01783     }
01784   }
01785       catch(evf::Exception &e)
01786       {
01787         LOG4CPLUS_INFO(getApplicationLogger(),"evf::Exception caught in microstate - " << e.what());    
01788       }
01789     catch(cms::Exception &e)
01790       {
01791         LOG4CPLUS_INFO(getApplicationLogger(),"cms::Exception caught in microstate - " << e.what());    
01792       }
01793     catch(std::exception &e)
01794       {
01795         LOG4CPLUS_INFO(getApplicationLogger(),"std::Exception caught in microstate - " << e.what());    
01796       }
01797     catch(...)
01798       {
01799         LOG4CPLUS_INFO(getApplicationLogger(),"unknown exception caught in microstate - ");    
01800       }
01801 
01802 }
01803 
01804 
01805 void FUEventProcessor::updater(xgi::Input *in,xgi::Output *out)
01806 {
01807   using namespace utils;
01808 
01809   *out << updaterStatic_;
01810   mDiv(out,"loads");
01811   uptime(out);
01812   cDiv(out);
01813   mDiv(out,"st",fsm_.stateName()->toString());
01814   mDiv(out,"ru",runNumber_.toString());
01815   mDiv(out,"nsl",nbSubProcesses_.value_);
01816   mDiv(out,"nsr",nbSubProcessesReporting_.value_);
01817   mDiv(out,"cl");
01818   *out << getApplicationDescriptor()->getClassName() 
01819        << (nbSubProcesses_.value_ > 0 ? "MP " : " ");
01820   cDiv(out);
01821   mDiv(out,"in",getApplicationDescriptor()->getInstance());
01822   if(fsm_.stateName()->toString() != "Halted" && fsm_.stateName()->toString() != "halting"){
01823     mDiv(out,"hlt");
01824     *out << "<a href=\"" << configString_.toString() << "\">HLT Config</a>";
01825     cDiv(out);
01826     *out << std::endl;
01827   }
01828   else
01829     mDiv(out,"hlt","Not yet...");
01830 
01831   mDiv(out,"sq",squidPresent_.toString());
01832   mDiv(out,"vwl",(supervising_ ? "Active" : "Not Initialized"));
01833   mDiv(out,"mwl",evtProcessor_.wlMonitoring());
01834   if(nbProcessed != 0 && nbAccepted != 0)
01835     {
01836       mDiv(out,"tt",((xdata::UnsignedInteger32*)nbProcessed)->value_);
01837       mDiv(out,"ac",((xdata::UnsignedInteger32*)nbAccepted)->value_);
01838     }
01839   else
01840     {
01841       mDiv(out,"tt",0);
01842       mDiv(out,"ac",0);
01843     }
01844   if(!myProcess_)
01845     mDiv(out,"swl",(wlSummarizeActive_ ? "Active" : "Inactive"));
01846   else
01847     mDiv(out,"swl",(wlScalersActive_ ? "Active" : "Inactive"));
01848 
01849   mDiv(out,"idi",iDieUrl_.value_);
01850   if(vp_!=0){
01851     mDiv(out,"vpi",(unsigned int) vp_);
01852     if(vulture_->hasStarted()>=0)
01853       mDiv(out,"vul","Prowling");
01854     else
01855       mDiv(out,"vul","Dead");
01856   }
01857   else{
01858     mDiv(out,"vul",(vulture_==0 ? "Nope" : "Hatching"));
01859   }    
01860   if(evtProcessor_){
01861     mDiv(out,"ll");
01862     *out << evtProcessor_.lastLumi().ls
01863          << "," << evtProcessor_.lastLumi().proc << "," << evtProcessor_.lastLumi().acc;
01864     cDiv(out);
01865   }
01866   mDiv(out,"lg");
01867   for(unsigned int i = logRingIndex_; i<logRingSize_; i++)
01868     *out << logRing_[i] << std::endl;
01869   if(logWrap_)
01870     for(unsigned int i = 0; i<logRingIndex_; i++)
01871       *out << logRing_[i] << std::endl;
01872   cDiv(out);
01873   
01874 }
01875 
01876 void FUEventProcessor::procStat(xgi::Input *in, xgi::Output *out)
01877 {
01878   evf::utils::procStat(out);
01879 }
01880 
01881 void FUEventProcessor::sendMessageOverMonitorQueue(MsgBuf &buf)
01882 {
01883   if(myProcess_) myProcess_->postSlave(buf,true);
01884 }
01885 
01886 void FUEventProcessor::makeStaticInfo()
01887 {
01888   using namespace utils;
01889   std::ostringstream ost;
01890   mDiv(&ost,"ve");
01891   ost<< "$Revision: 1.132 $ (" << edm::getReleaseVersion() <<")";
01892   cDiv(&ost);
01893   mDiv(&ost,"ou",outPut_.toString());
01894   mDiv(&ost,"sh",hasShMem_.toString());
01895   mDiv(&ost,"mw",hasModuleWebRegistry_.toString());
01896   mDiv(&ost,"sw",hasServiceWebRegistry_.toString());
01897   
01898   xdata::Serializable *monsleep = 0;
01899   xdata::Serializable *lstimeout = 0;
01900   try{
01901     monsleep  = applicationInfoSpace_->find("monSleepSec");
01902     lstimeout = applicationInfoSpace_->find("lsTimeOut");
01903   }
01904   catch(xdata::exception::Exception e){
01905   }
01906   
01907   if(monsleep!=0)
01908     mDiv(&ost,"ms",monsleep->toString());
01909   if(lstimeout!=0)
01910     mDiv(&ost,"lst",lstimeout->toString());
01911   char cbuf[sizeof(struct utsname)];
01912   struct utsname* buf = (struct utsname*)cbuf;
01913   uname(buf);
01914   mDiv(&ost,"sysinfo");
01915   ost << buf->sysname << " " << buf->nodename 
01916       << " " << buf->release << " " << buf->version << " " << buf->machine;
01917   cDiv(&ost);
01918   updaterStatic_ = ost.str();
01919 }
01920 
01921 XDAQ_INSTANTIATOR_IMPL(evf::FUEventProcessor)