CMS 3D CMS Logo

CMSSW_4_4_3_patch1/src/EventFilter/Processor/src/FUEventProcessor.cc

Go to the documentation of this file.
00001 
00002 //
00003 // FUEventProcessor
00004 // ----------------
00005 //
00007 
00008 #include "FUEventProcessor.h"
00009 #include "procUtils.h"
00010 #include "EventFilter/Utilities/interface/CPUStat.h"
00011 #include "EventFilter/Utilities/interface/RateStat.h"
00012 
00013 #include "EventFilter/Utilities/interface/Exception.h"
00014 
00015 #include "EventFilter/Message2log4cplus/interface/MLlog4cplus.h"
00016 #include "EventFilter/Modules/interface/FUShmDQMOutputService.h"
00017 #include "EventFilter/Utilities/interface/ServiceWebRegistry.h"
00018 #include "EventFilter/Utilities/interface/ServiceWeb.h"
00019 
00020 
00021 #include "FWCore/PluginManager/interface/ProblemTracker.h"
00022 #include "FWCore/PluginManager/interface/PresenceFactory.h"
00023 #include "FWCore/Utilities/interface/Presence.h"
00024 #include "FWCore/Utilities/interface/Exception.h"
00025 #include "FWCore/Version/interface/GetReleaseVersion.h"
00026 #include "FWCore/ServiceRegistry/interface/Service.h"
00027 
00028 #include "toolbox/BSem.h" 
00029 
00030 #include <boost/tokenizer.hpp>
00031 
00032 #include "xcept/tools.h"
00033 #include "xgi/Method.h"
00034 
00035 #include "cgicc/CgiDefs.h"
00036 #include "cgicc/Cgicc.h"
00037 #include "cgicc/FormEntry.h"
00038 
00039 
00040 #include <sys/wait.h>
00041 #include <sys/utsname.h>
00042 
00043 #include <typeinfo>
00044 #include <stdlib.h>
00045 #include <stdio.h>
00046 #include <errno.h>
00047 
00048 using namespace evf;
00049 using namespace cgicc;
00050 namespace toolbox {
00051   namespace mem {
00052     extern toolbox::BSem * _s_mutex_ptr_; 
00053   }
00054 }
00055 
00057 // construction/destruction
00059 
00060 //______________________________________________________________________________
00061 FUEventProcessor::FUEventProcessor(xdaq::ApplicationStub *s) 
00062   : xdaq::Application(s)
00063   , fsm_(this)
00064   , log_(getApplicationLogger())
00065   , evtProcessor_(log_, getApplicationDescriptor()->getInstance())
00066   , runNumber_(0)
00067   , epInitialized_(false)
00068   , outPut_(true)
00069   , autoRestartSlaves_(false)
00070   , slaveRestartDelaySecs_(10)
00071   , hasShMem_(true)
00072   , hasPrescaleService_(true)
00073   , hasModuleWebRegistry_(true)
00074   , hasServiceWebRegistry_(true)
00075   , isRunNumberSetter_(true)
00076   , iDieStatisticsGathering_(false)
00077   , outprev_(true)
00078   , exitOnError_(true)
00079   , reasonForFailedState_()
00080   , squidnet_(3128,"https://localhost:8000/RELEASE-NOTES.txt")
00081   , logRing_(logRingSize_)
00082   , logRingIndex_(logRingSize_)
00083   , logWrap_(false)
00084   , nbSubProcesses_(0)
00085   , nbSubProcessesReporting_(0)
00086   , nblive_(0)
00087   , nbdead_(0)
00088   , nbTotalDQM_(0)
00089   , wlReceiving_(0)
00090   , asReceiveMsgAndExecute_(0)
00091   , receiving_(false) 
00092   , wlReceivingMonitor_(0)
00093   , asReceiveMsgAndRead_(0)
00094   , receivingM_(false)
00095   , myProcess_(0)
00096   , wlSupervising_(0)
00097   , asSupervisor_(0)
00098   , supervising_(false)
00099   , monitorInfoSpace_(0)
00100   , monitorLegendaInfoSpace_(0)
00101   , applicationInfoSpace_(0)
00102   , nbProcessed(0)
00103   , nbAccepted(0)
00104   , scalersInfoSpace_(0)
00105   , scalersLegendaInfoSpace_(0)
00106   , wlScalers_(0)
00107   , asScalers_(0)
00108   , wlScalersActive_(false)
00109   , scalersUpdates_(0)
00110   , wlSummarize_(0)
00111   , asSummarize_(0)
00112   , wlSummarizeActive_(false)
00113   , superSleepSec_(1)
00114   , iDieUrl_("none")
00115   , vulture_(0)
00116   , vp_(0)
00117   , cpustat_(0)
00118   , ratestat_(0)
00119   , master_message_prg_(0,MSQM_MESSAGE_TYPE_PRG)
00120   , master_message_prr_(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_PRR)
00121   , slave_message_prr_(sizeof(prg),MSQS_MESSAGE_TYPE_PRR)
00122   , master_message_trr_(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_TRR)
00123 {
00124   using namespace utils;
00125 
00126   names_.push_back("nbProcessed"    );
00127   names_.push_back("nbAccepted"     );
00128   names_.push_back("epMacroStateInt");
00129   names_.push_back("epMicroStateInt");
00130   // create pipe for web communication
00131   int retpipe = pipe(anonymousPipe_);
00132   if(retpipe != 0)
00133         LOG4CPLUS_ERROR(getApplicationLogger(),"Failed to create pipe");
00134   // check squid presence
00135   squidPresent_ = squidnet_.check();
00136   //pass application parameters to FWEPWrapper
00137   evtProcessor_.setAppDesc(getApplicationDescriptor());
00138   evtProcessor_.setAppCtxt(getApplicationContext());
00139   // bind relevant callbacks to finite state machine
00140   fsm_.initialize<evf::FUEventProcessor>(this);
00141   
00142   //set sourceId_
00143   url_ =
00144     getApplicationDescriptor()->getContextDescriptor()->getURL()+"/"+
00145     getApplicationDescriptor()->getURN();
00146   class_   =getApplicationDescriptor()->getClassName();
00147   instance_=getApplicationDescriptor()->getInstance();
00148   sourceId_=class_.toString()+"_"+instance_.toString();
00149   LOG4CPLUS_INFO(getApplicationLogger(),sourceId_ <<" constructor"         );
00150   LOG4CPLUS_INFO(getApplicationLogger(),"CMSSW_BASE:"<<getenv("CMSSW_BASE"));
00151   
00152   getApplicationDescriptor()->setAttribute("icon", "/evf/images/epicon.jpg");
00153   
00154   xdata::InfoSpace *ispace = getApplicationInfoSpace();
00155   applicationInfoSpace_ = ispace;
00156 
00157   // default configuration
00158   ispace->fireItemAvailable("parameterSet",         &configString_                );
00159   ispace->fireItemAvailable("epInitialized",        &epInitialized_               );
00160   ispace->fireItemAvailable("stateName",             fsm_.stateName()             );
00161   ispace->fireItemAvailable("runNumber",            &runNumber_                   );
00162   ispace->fireItemAvailable("outputEnabled",        &outPut_                      );
00163 
00164   ispace->fireItemAvailable("hasSharedMemory",      &hasShMem_);
00165   ispace->fireItemAvailable("hasPrescaleService",   &hasPrescaleService_          );
00166   ispace->fireItemAvailable("hasModuleWebRegistry", &hasModuleWebRegistry_        );
00167   ispace->fireItemAvailable("hasServiceWebRegistry", &hasServiceWebRegistry_      );
00168   ispace->fireItemAvailable("isRunNumberSetter",    &isRunNumberSetter_           );
00169   ispace->fireItemAvailable("iDieStatisticsGathering",   &iDieStatisticsGathering_);
00170   ispace->fireItemAvailable("rcmsStateListener",     fsm_.rcmsStateListener()     );
00171   ispace->fireItemAvailable("foundRcmsStateListener",fsm_.foundRcmsStateListener());
00172   ispace->fireItemAvailable("nbSubProcesses",       &nbSubProcesses_              );
00173   ispace->fireItemAvailable("nbSubProcessesReporting",&nbSubProcessesReporting_   );
00174   ispace->fireItemAvailable("superSleepSec",        &superSleepSec_               );
00175   ispace->fireItemAvailable("autoRestartSlaves",    &autoRestartSlaves_           );
00176   ispace->fireItemAvailable("slaveRestartDelaySecs",&slaveRestartDelaySecs_       );
00177   ispace->fireItemAvailable("iDieUrl",              &iDieUrl_                     );
00178   
00179   // Add infospace listeners for exporting data values
00180   getApplicationInfoSpace()->addItemChangedListener("parameterSet",        this);
00181   getApplicationInfoSpace()->addItemChangedListener("outputEnabled",       this);
00182 
00183   // findRcmsStateListener
00184   fsm_.findRcmsStateListener();
00185   
00186   // initialize monitoring infospace
00187 
00188   std::string monInfoSpaceName="evf-eventprocessor-status-monitor";
00189   toolbox::net::URN urn = this->createQualifiedInfoSpace(monInfoSpaceName);
00190   monitorInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
00191 
00192   std::string monLegendaInfoSpaceName="evf-eventprocessor-status-legenda";
00193   urn = this->createQualifiedInfoSpace(monLegendaInfoSpaceName);
00194   monitorLegendaInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
00195 
00196   
00197   monitorInfoSpace_->fireItemAvailable("url",                      &url_            );
00198   monitorInfoSpace_->fireItemAvailable("class",                    &class_          );
00199   monitorInfoSpace_->fireItemAvailable("instance",                 &instance_       );
00200   monitorInfoSpace_->fireItemAvailable("runNumber",                &runNumber_      );
00201   monitorInfoSpace_->fireItemAvailable("stateName",                 fsm_.stateName()); 
00202 
00203   monitorInfoSpace_->fireItemAvailable("squidPresent",             &squidPresent_   );
00204 
00205   std::string scalersInfoSpaceName="evf-eventprocessor-scalers-monitor";
00206   urn = this->createQualifiedInfoSpace(scalersInfoSpaceName);
00207   scalersInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
00208 
00209   std::string scalersLegendaInfoSpaceName="evf-eventprocessor-scalers-legenda";
00210   urn = this->createQualifiedInfoSpace(scalersLegendaInfoSpaceName);
00211   scalersLegendaInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
00212 
00213 
00214 
00215   evtProcessor_.setScalersInfoSpace(scalersInfoSpace_,scalersLegendaInfoSpace_);
00216   scalersInfoSpace_->fireItemAvailable("instance", &instance_);
00217 
00218   evtProcessor_.setApplicationInfoSpace(ispace);
00219   evtProcessor_.setMonitorInfoSpace(monitorInfoSpace_,monitorLegendaInfoSpace_);
00220   evtProcessor_.publishConfigAndMonitorItems(nbSubProcesses_.value_!=0);
00221 
00222   //subprocess state vectors for MP
00223   monitorInfoSpace_->fireItemAvailable("epMacroStateInt",             &spMStates_); 
00224   monitorInfoSpace_->fireItemAvailable("epMicroStateInt",             &spmStates_); 
00225   
00226   // Bind web interface
00227   xgi::bind(this, &FUEventProcessor::css,              "styles.css");
00228   xgi::bind(this, &FUEventProcessor::defaultWebPage,   "Default"   );
00229   xgi::bind(this, &FUEventProcessor::spotlightWebPage, "Spotlight" );
00230   xgi::bind(this, &FUEventProcessor::scalersWeb,       "scalersWeb");
00231   xgi::bind(this, &FUEventProcessor::pathNames,        "pathNames" );
00232   xgi::bind(this, &FUEventProcessor::subWeb,           "SubWeb"    );
00233   xgi::bind(this, &FUEventProcessor::moduleWeb,        "moduleWeb" );
00234   xgi::bind(this, &FUEventProcessor::serviceWeb,       "serviceWeb");
00235   xgi::bind(this, &FUEventProcessor::microState,       "microState");
00236   xgi::bind(this, &FUEventProcessor::updater,          "updater"   );
00237   xgi::bind(this, &FUEventProcessor::procStat,         "procStat"  );
00238 
00239   // instantiate the plugin manager, not referenced here after!
00240 
00241   edm::AssertHandler ah;
00242 
00243   try{
00244     LOG4CPLUS_DEBUG(getApplicationLogger(),
00245                     "Trying to create message service presence ");
00246     edm::PresenceFactory *pf = edm::PresenceFactory::get();
00247     if(pf != 0) {
00248       pf->makePresence("MessageServicePresence").release();
00249     }
00250     else {
00251       LOG4CPLUS_ERROR(getApplicationLogger(),
00252                       "Unable to create message service presence ");
00253     }
00254   } 
00255   catch(...) {
00256     LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception");
00257   }
00258   ML::MLlog4cplus::setAppl(this);      
00259 
00260   typedef std::set<xdaq::ApplicationDescriptor*> AppDescSet_t;
00261   typedef AppDescSet_t::iterator                 AppDescIter_t;
00262   
00263   AppDescSet_t rcms=
00264     getApplicationContext()->getDefaultZone()->
00265     getApplicationDescriptors("RCMSStateListener");
00266   if(rcms.size()==0) 
00267     {
00268       LOG4CPLUS_WARN(getApplicationLogger(),
00269                        "MonitorReceiver not found, perhaphs it has not been defined ? Scalers updater wl will bail out!");
00270       //        localLog("-W- MonitorReceiver not found, perhaphs it has not been defined ? Scalers updater wl will bail out!");
00271     }
00272   else
00273     {
00274       AppDescIter_t it = rcms.begin();
00275       evtProcessor_.setRcms(*it);
00276     }
00277   pthread_mutex_init(&start_lock_,0);
00278   pthread_mutex_init(&stop_lock_,0);
00279   pthread_mutex_init(&pickup_lock_,0);
00280 
00281   makeStaticInfo();
00282   startSupervisorLoop();  
00283 
00284   if(vulture_==0) vulture_ = new Vulture(true);
00285 
00287 
00288   AppDescSet_t setOfiDie=
00289     getApplicationContext()->getDefaultZone()->
00290     getApplicationDescriptors("evf::iDie");
00291   
00292   for (AppDescIter_t it=setOfiDie.begin();it!=setOfiDie.end();++it)
00293     if ((*it)->getInstance()==0) // there has to be only one instance of iDie
00294       iDieUrl_ = (*it)->getContextDescriptor()->getURL() + "/" + (*it)->getURN();
00295 }
00296 //___________here ends the *huge* constructor___________________________________
00297 
00298 
00299 //______________________________________________________________________________
00300 FUEventProcessor::~FUEventProcessor()
00301 {
00302   // no longer needed since the wrapper is a member of the class and one can rely on 
00303   // implicit destructor - to be revised - at any rate the most common exit path is via "kill"...
00304   //  if (evtProcessor_) delete evtProcessor_;
00305   if(vulture_ != 0) delete vulture_;
00306 }
00307 
00308 
00310 // implementation of member functions
00312 
00313 
00314 //______________________________________________________________________________
00315 bool FUEventProcessor::configuring(toolbox::task::WorkLoop* wl)
00316 {
00317 //   std::cout << "values " << ((nbSubProcesses_.value_!=0) ? 0x10 : 0) << " "
00318 //          << ((instance_.value_==0) ? 0x8 : 0) << " "
00319 //          << (hasServiceWebRegistry_.value_ ? 0x4 : 0) << " "
00320 //          << (hasModuleWebRegistry_.value_ ? 0x2 : 0) << " "
00321 //          << (hasPrescaleService_.value_ ? 0x1 : 0) <<std::endl;
00322   unsigned short smap 
00323     = ((nbSubProcesses_.value_!=0) ? 0x10 : 0)
00324     + (((instance_.value_%80)==0) ? 0x8 : 0) // have at least one legend per slice
00325     + (hasServiceWebRegistry_.value_ ? 0x4 : 0) 
00326     + (hasModuleWebRegistry_.value_ ? 0x2 : 0) 
00327     + (hasPrescaleService_.value_ ? 0x1 : 0);
00328   if(nbSubProcesses_.value_==0) 
00329     {
00330       spMStates_.setSize(1); 
00331       spmStates_.setSize(1); 
00332     }
00333   else
00334     {
00335       spMStates_.setSize(nbSubProcesses_.value_);
00336       spmStates_.setSize(nbSubProcesses_.value_);
00337       for(unsigned int i = 0; i < spMStates_.size(); i++)
00338         {
00339           spMStates_[i] = edm::event_processor::sInit; 
00340           spmStates_[i] = 0; 
00341         }
00342     }
00343   try {
00344     LOG4CPLUS_INFO(getApplicationLogger(),"Start configuring ...");
00345     std::string cfg = configString_.toString(); evtProcessor_.init(smap,cfg);
00346     epInitialized_=true;
00347     if(evtProcessor_)
00348       {
00349         // moved to wrapper class
00350         configuration_ = evtProcessor_.configuration();
00351         if(nbSubProcesses_.value_==0) evtProcessor_.startMonitoringWorkLoop(); 
00352         evtProcessor_->beginJob(); 
00353         if(cpustat_) {delete cpustat_; cpustat_=0;}
00354         cpustat_ = new CPUStat(evtProcessor_.getNumberOfMicrostates(),
00355                                nbSubProcesses_.value_,
00356                                instance_.value_,
00357                                iDieUrl_.value_);
00358         if(ratestat_) {delete ratestat_; ratestat_=0;}
00359         ratestat_ = new RateStat(iDieUrl_.value_);
00360         if(iDieStatisticsGathering_.value_){
00361           try{
00362             cpustat_->sendLegenda(evtProcessor_.getmicromap());
00363             xdata::Serializable *legenda = scalersLegendaInfoSpace_->find("scalersLegenda");
00364             if(legenda !=0){
00365               std::string slegenda = ((xdata::String*)legenda)->value_;
00366               ratestat_->sendLegenda(slegenda);
00367             }
00368 
00369           }
00370           catch(evf::Exception &e){
00371             LOG4CPLUS_INFO(getApplicationLogger(),"coud not send legenda"
00372                            << e.what());
00373           }
00374         }
00375         
00376         fsm_.fireEvent("ConfigureDone",this);
00377         LOG4CPLUS_INFO(getApplicationLogger(),"Finished configuring!");
00378         localLog("-I- Configuration completed");
00379 
00380       }
00381   }
00382   catch (xcept::Exception &e) {
00383     reasonForFailedState_ = "configuring FAILED: " + (std::string)e.what();
00384     fsm_.fireFailed(reasonForFailedState_,this);
00385     localLog(reasonForFailedState_);
00386   }
00387   catch(cms::Exception &e) {
00388     reasonForFailedState_ = e.explainSelf();
00389     fsm_.fireFailed(reasonForFailedState_,this);
00390     localLog(reasonForFailedState_);
00391   }    
00392   catch(std::exception &e) {
00393     reasonForFailedState_ = e.what();
00394     fsm_.fireFailed(reasonForFailedState_,this);
00395     localLog(reasonForFailedState_);
00396   }
00397   catch(...) {
00398     fsm_.fireFailed("Unknown Exception",this);
00399   }
00400 
00401   if(vulture_!=0 && vp_ == 0) vp_ = vulture_->makeProcess();
00402 
00403   return false;
00404 }
00405 
00406 
00407 
00408 
00409 //______________________________________________________________________________
00410 bool FUEventProcessor::enabling(toolbox::task::WorkLoop* wl)
00411 {
00412   nbTotalDQM_ = 0;
00413   scalersUpdates_ = 0;
00414 //   std::cout << "values " << ((nbSubProcesses_.value_!=0) ? 0x10 : 0) << " "
00415 //          << ((instance_.value_==0) ? 0x8 : 0) << " "
00416 //          << (hasServiceWebRegistry_.value_ ? 0x4 : 0) << " "
00417 //          << (hasModuleWebRegistry_.value_ ? 0x2 : 0) << " "
00418 //          << (hasPrescaleService_.value_ ? 0x1 : 0) <<std::endl;
00419   unsigned short smap 
00420     = ((nbSubProcesses_.value_!=0) ? 0x10 : 0)
00421     + (((instance_.value_%80)==0) ? 0x8 : 0) // have at least one legend per slice
00422     + (hasServiceWebRegistry_.value_ ? 0x4 : 0) 
00423     + (hasModuleWebRegistry_.value_ ? 0x2 : 0) 
00424     + (hasPrescaleService_.value_ ? 0x1 : 0);
00425 
00426   LOG4CPLUS_INFO(getApplicationLogger(),"Start enabling...");
00427   if(!epInitialized_){
00428     evtProcessor_.forceInitEventProcessorMaybe();
00429   }
00430   std::string cfg = configString_.toString(); evtProcessor_.init(smap,cfg);
00431 
00432   if(!epInitialized_){
00433     evtProcessor_->beginJob(); 
00434     if(cpustat_) {delete cpustat_; cpustat_=0;}
00435     cpustat_ = new CPUStat(evtProcessor_.getNumberOfMicrostates(),
00436                            nbSubProcesses_.value_,
00437                            instance_.value_,
00438                            iDieUrl_.value_);
00439     if(ratestat_) {delete ratestat_; ratestat_=0;}
00440     ratestat_ = new RateStat(iDieUrl_.value_);
00441     if(iDieStatisticsGathering_.value_){
00442       try
00443         {
00444         cpustat_->sendLegenda(evtProcessor_.getmicromap());
00445         xdata::Serializable *legenda = scalersLegendaInfoSpace_->find("scalersLegenda");
00446         if(legenda !=0)
00447           {
00448             std::string slegenda = ((xdata::String*)legenda)->value_;
00449             ratestat_->sendLegenda(slegenda);
00450           }
00451         }
00452       catch(evf::Exception &e)
00453         {
00454           LOG4CPLUS_INFO(getApplicationLogger(),"could not send legenda"
00455                          << e.what());
00456         }
00457       catch (xcept::Exception& e) {
00458         LOG4CPLUS_ERROR(getApplicationLogger(),"Failed to get or send legenda."
00459                         << e.what());
00460       }
00461     }
00462     epInitialized_ = true;
00463   }
00464   configuration_ = evtProcessor_.configuration(); // get it again after init has been carried out...
00465   evtProcessor_.resetLumiSectionReferenceIndex();
00466   //classic appl will return here 
00467   if(nbSubProcesses_.value_==0) return enableClassic();
00468   //protect manipulation of subprocess array
00469   pthread_mutex_lock(&start_lock_);
00470   subs_.clear();
00471   subs_.resize(nbSubProcesses_.value_); // this should not be necessary
00472   pid_t retval = -1;
00473   for(unsigned int i=0; i<nbSubProcesses_.value_; i++)
00474     {
00475       subs_[i]=SubProcess(i,retval); //this will replace all the scattered variables
00476     }
00477   pthread_mutex_unlock(&start_lock_);
00478 
00479   for(unsigned int i=0; i<nbSubProcesses_.value_; i++)
00480     {
00481       retval = subs_[i].forkNew();
00482       if(retval==0)
00483         {
00484           myProcess_ = &subs_[i];
00485           // dirty hack: delete/recreate global binary semaphore for later use in child
00486           delete toolbox::mem::_s_mutex_ptr_;
00487           toolbox::mem::_s_mutex_ptr_ = new toolbox::BSem(toolbox::BSem::FULL,true);
00488           int retval = pthread_mutex_destroy(&stop_lock_);
00489           if(retval != 0) perror("error");
00490           retval = pthread_mutex_init(&stop_lock_,0);
00491           if(retval != 0) perror("error");
00492           fsm_.disableRcmsStateNotification();
00493           return enableMPEPSlave();
00494           // the loop is broken in the child 
00495         }
00496     }
00497   
00498   startSummarizeWorkLoop();
00499   vp_ = vulture_->start(iDieUrl_.value_,runNumber_.value_);
00500 
00501   LOG4CPLUS_INFO(getApplicationLogger(),"Finished enabling!");
00502   fsm_.fireEvent("EnableDone",this);
00503   localLog("-I- Start completed");
00504   return false;
00505 }
00506 
00507 
00508 //______________________________________________________________________________
00509 bool FUEventProcessor::stopping(toolbox::task::WorkLoop* wl)
00510 {
00511   if(nbSubProcesses_.value_!=0) 
00512     stopSlavesAndAcknowledge();
00513   vulture_->stop();
00514   return stopClassic();
00515 }
00516 
00517 
00518 //______________________________________________________________________________
00519 bool FUEventProcessor::halting(toolbox::task::WorkLoop* wl)
00520 {
00521   LOG4CPLUS_INFO(getApplicationLogger(),"Start halting ...");
00522   if(nbSubProcesses_.value_!=0) 
00523     stopSlavesAndAcknowledge();
00524   try{
00525     evtProcessor_.stopAndHalt();
00526   }
00527   catch (evf::Exception &e) {
00528     reasonForFailedState_ = "halting FAILED: " + (std::string)e.what();
00529     localLog(reasonForFailedState_);
00530     fsm_.fireFailed(reasonForFailedState_,this);
00531   }
00532   //  if(hasShMem_) detachDqmFromShm();
00533 
00534   LOG4CPLUS_INFO(getApplicationLogger(),"Finished halting!");
00535   fsm_.fireEvent("HaltDone",this);
00536 
00537   localLog("-I- Halt completed");
00538   return false;
00539 }
00540 
00541 
00542 //______________________________________________________________________________
00543 xoap::MessageReference FUEventProcessor::fsmCallback(xoap::MessageReference msg)
00544   throw (xoap::exception::Exception)
00545 {
00546   return fsm_.commandCallback(msg);
00547 }
00548 
00549 
00550 //______________________________________________________________________________
00551 void FUEventProcessor::actionPerformed(xdata::Event& e)
00552 {
00553 
00554   if (e.type()=="ItemChangedEvent" && fsm_.stateName()->toString()!="Halted") {
00555     std::string item = dynamic_cast<xdata::ItemChangedEvent&>(e).itemName();
00556     
00557     if ( item == "parameterSet") {
00558       LOG4CPLUS_WARN(getApplicationLogger(),
00559                      "HLT Menu changed, force reinitialization of EventProcessor");
00560       epInitialized_ = false;
00561     }
00562     if ( item == "outputEnabled") {
00563       if(outprev_ != outPut_) {
00564         LOG4CPLUS_WARN(getApplicationLogger(),
00565                        (outprev_ ? "Disabling " : "Enabling ")<<"global output");
00566         evtProcessor_->enableEndPaths(outPut_);
00567         outprev_ = outPut_;
00568       }
00569     }
00570     if (item == "globalInputPrescale") {
00571       LOG4CPLUS_WARN(getApplicationLogger(),
00572                      "Setting global input prescale has no effect "
00573                      <<"in this version of the code");
00574     }
00575     if ( item == "globalOutputPrescale") {
00576       LOG4CPLUS_WARN(getApplicationLogger(),
00577                      "Setting global output prescale has no effect "
00578                      <<"in this version of the code");
00579     }
00580   }
00581   
00582 }
00583 
00584 //______________________________________________________________________________
00585 void FUEventProcessor::subWeb(xgi::Input  *in, xgi::Output *out)
00586 {
00587   using namespace cgicc;
00588   pid_t pid = 0;
00589   std::ostringstream ost;
00590   ost << "&";
00591 
00592   Cgicc cgi(in);
00593   internal::MyCgi *mycgi = (internal::MyCgi*)in;
00594   for(std::map<std::string, std::string, std::less<std::string> >::iterator mit = 
00595         mycgi->getEnvironment().begin();
00596       mit != mycgi->getEnvironment().end(); mit++)
00597     ost << mit->first << "%" << mit->second << ";";
00598   std::vector<FormEntry> els = cgi.getElements() ;
00599   std::vector<FormEntry> el1;
00600   cgi.getElement("method",el1);
00601   std::vector<FormEntry> el2;
00602   cgi.getElement("process",el2);
00603   if(el1.size()!=0) {
00604     std::string meth = el1[0].getValue();
00605     if(el2.size()!=0) {
00606       unsigned int i = 0;
00607       std::string mod = el2[0].getValue();
00608       pid = atoi(mod.c_str()); // get the process id to be polled
00609       for(; i < subs_.size(); i++)
00610         if(subs_[i].pid()==pid) break;
00611       if(i>=subs_.size()){ //process was not found, let the browser know
00612         *out << "ERROR 404 : Process " << pid << " Not Found !" << std::endl;
00613         return;
00614       } 
00615       if(subs_[i].alive() != 1){
00616         *out << "ERROR 405 : Process " << pid << " Not Alive !" << std::endl;
00617         return;
00618       }
00619       MsgBuf msg1(meth.length()+ost.str().length()+1,MSQM_MESSAGE_TYPE_WEB);
00620       strncpy(msg1->mtext,meth.c_str(),meth.length());
00621       strncpy(msg1->mtext+meth.length(),ost.str().c_str(),ost.str().length());
00622       subs_[i].post(msg1,true);
00623       unsigned int keep_supersleep_original_value = superSleepSec_.value_;
00624       superSleepSec_.value_=10*keep_supersleep_original_value;
00625       MsgBuf msg(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_WEB);
00626       bool done = false;
00627       std::vector<char *>pieces;
00628       while(!done){
00629         unsigned long retval1 = subs_[i].rcvNonBlocking(msg,true);
00630         if(retval1 == MSGQ_MESSAGE_TYPE_RANGE){
00631 	  ::sleep(1);
00632           continue;
00633         }
00634         unsigned int nbytes = atoi(msg->mtext);
00635         if(nbytes < MAX_PIPE_BUFFER_SIZE) done = true; // this will break the while loop
00636         char *buf= new char[nbytes];
00637         ssize_t retval = read(anonymousPipe_[PIPE_READ],buf,nbytes);
00638         if(retval!=nbytes) std::cout 
00639           << "CAREFUL HERE, read less bytes than expected from pipe in subWeb" << std::endl;
00640         pieces.push_back(buf);
00641       }
00642       superSleepSec_.value_=keep_supersleep_original_value;
00643       for(unsigned int j = 0; j < pieces.size(); j++){
00644         *out<<pieces[j];    // chain the buffers into the output strstream
00645         delete[] pieces[j]; //make sure to release all buffers used for reading the pipe
00646       }
00647     }
00648   }
00649 }
00650 
00651 
00652 //______________________________________________________________________________
00653 void FUEventProcessor::defaultWebPage(xgi::Input  *in, xgi::Output *out)
00654   throw (xgi::exception::Exception)
00655 {
00656 
00657 
00658   *out << "<!DOCTYPE HTML PUBLIC \"-//W3C//DTD HTML 4.0 Transitional//EN\">" 
00659        << "<html><head><title>" << getApplicationDescriptor()->getClassName() << (nbSubProcesses_.value_ > 0 ? "MP " : " ") 
00660        << getApplicationDescriptor()->getInstance() << "</title>"
00661        << "<meta https-equiv=\"REFRESH\" content=\"0;url=/evf/html/defaultBasePage.html\">"
00662        << "</head></html>";
00663 }
00664 
00665 
00666 //______________________________________________________________________________
00667 
00668 
00669 void FUEventProcessor::spotlightWebPage(xgi::Input  *in, xgi::Output *out)
00670   throw (xgi::exception::Exception)
00671 {
00672 
00673   std::string urn = getApplicationDescriptor()->getURN();
00674 
00675   *out << "<!-- base href=\"/" <<  urn
00676        << "\"> -->" << std::endl;
00677   *out << "<html>"                                                   << std::endl;
00678   *out << "<head>"                                                   << std::endl;
00679   *out << "<link type=\"text/css\" rel=\"stylesheet\"";
00680   *out << " href=\"/evf/html/styles.css\"/>"                   << std::endl;
00681   *out << "<title>" << getApplicationDescriptor()->getClassName() 
00682        << getApplicationDescriptor()->getInstance() 
00683        << " MAIN</title>"     << std::endl;
00684   *out << "</head>"                                                  << std::endl;
00685   *out << "<body>"                                                   << std::endl;
00686   *out << "<table border=\"0\" width=\"100%\">"                      << std::endl;
00687   *out << "<tr>"                                                     << std::endl;
00688   *out << "  <td align=\"left\">"                                    << std::endl;
00689   *out << "    <img"                                                 << std::endl;
00690   *out << "     align=\"middle\""                                    << std::endl;
00691   *out << "     src=\"/evf/images/spoticon.jpg\""                            << std::endl;
00692   *out << "     alt=\"main\""                                        << std::endl;
00693   *out << "     width=\"64\""                                        << std::endl;
00694   *out << "     height=\"64\""                                       << std::endl;
00695   *out << "     border=\"\"/>"                                       << std::endl;
00696   *out << "    <b>"                                                  << std::endl;
00697   *out << getApplicationDescriptor()->getClassName() 
00698        << getApplicationDescriptor()->getInstance()                  << std::endl;
00699   *out << "      " << fsm_.stateName()->toString()                   << std::endl;
00700   *out << "    </b>"                                                 << std::endl;
00701   *out << "  </td>"                                                  << std::endl;
00702   *out << "  <td width=\"32\">"                                      << std::endl;
00703   *out << "    <a href=\"/urn:xdaq-application:lid=3\">"             << std::endl;
00704   *out << "      <img"                                               << std::endl;
00705   *out << "       align=\"middle\""                                  << std::endl;
00706   *out << "       src=\"/hyperdaq/images/HyperDAQ.jpg\""             << std::endl;
00707   *out << "       alt=\"HyperDAQ\""                                  << std::endl;
00708   *out << "       width=\"32\""                                      << std::endl;
00709   *out << "       height=\"32\""                                     << std::endl;
00710   *out << "       border=\"\"/>"                                     << std::endl;
00711   *out << "    </a>"                                                 << std::endl;
00712   *out << "  </td>"                                                  << std::endl;
00713   *out << "  <td width=\"32\">"                                      << std::endl;
00714   *out << "  </td>"                                                  << std::endl;
00715   *out << "  <td width=\"32\">"                                      << std::endl;
00716   *out << "    <a href=\"/" << urn << "/\">"                         << std::endl;
00717   *out << "      <img"                                               << std::endl;
00718   *out << "       align=\"middle\""                                  << std::endl;
00719   *out << "       src=\"/evf/images/epicon.jpg\""                    << std::endl;
00720   *out << "       alt=\"main\""                                      << std::endl;
00721   *out << "       width=\"32\""                                      << std::endl;
00722   *out << "       height=\"32\""                                     << std::endl;
00723   *out << "       border=\"\"/>"                                     << std::endl;
00724   *out << "    </a>"                                                 << std::endl;
00725   *out << "  </td>"                                                  << std::endl;
00726   *out << "</tr>"                                                    << std::endl;
00727   *out << "</table>"                                                 << std::endl;
00728 
00729   *out << "<hr/>"                                                    << std::endl;
00730   
00731   std::ostringstream ost;
00732   if(myProcess_) 
00733     ost << "/SubWeb?process=" << getpid() << "&method=moduleWeb&";
00734   else
00735     ost << "/moduleWeb?";
00736   urn += ost.str();
00737   if(evtProcessor_ && (myProcess_ || nbSubProcesses_.value_==0))
00738     evtProcessor_.taskWebPage(in,out,urn);
00739   else if(evtProcessor_)
00740     evtProcessor_.summaryWebPage(in,out,urn);
00741   else
00742     *out << "<td>HLT Unconfigured</td>" << std::endl;
00743   *out << "</table>"                                                 << std::endl;
00744   
00745   *out << "<br><textarea rows=" << 10 << " cols=80 scroll=yes>"      << std::endl;
00746   *out << configuration_                                             << std::endl;
00747   *out << "</textarea><P>"                                           << std::endl;
00748   
00749   *out << "</body>"                                                  << std::endl;
00750   *out << "</html>"                                                  << std::endl;
00751 
00752 
00753 }
00754 void FUEventProcessor::scalersWeb(xgi::Input  *in, xgi::Output *out)
00755   throw (xgi::exception::Exception)
00756 {
00757 
00758   out->getHTTPResponseHeader().addHeader( "Content-Type",
00759                                           "application/octet-stream" );
00760   out->getHTTPResponseHeader().addHeader( "Content-Transfer-Encoding",
00761                                           "binary" );
00762   if(evtProcessor_ != 0){
00763     out->write( (char*)(evtProcessor_.getPackedTriggerReportAsStruct()), sizeof(TriggerReportStatic) );
00764   }
00765 }
00766 
00767 void FUEventProcessor::pathNames(xgi::Input  *in, xgi::Output *out)
00768   throw (xgi::exception::Exception)
00769 {
00770 
00771   if(evtProcessor_ != 0){
00772     xdata::Serializable *legenda = scalersLegendaInfoSpace_->find("scalersLegenda");
00773     if(legenda !=0){
00774       std::string slegenda = ((xdata::String*)legenda)->value_;
00775       *out << slegenda << std::endl;
00776     }
00777   }
00778 }
00779 
00780 void FUEventProcessor::attachDqmToShm() throw (evf::Exception)  
00781 {
00782   std::string errmsg;
00783   bool success = false;
00784   try {
00785     edm::ServiceRegistry::Operate operate(evtProcessor_->getToken());
00786     if(edm::Service<FUShmDQMOutputService>().isAvailable())
00787       success = edm::Service<FUShmDQMOutputService>()->attachToShm();
00788     if (!success) errmsg = "Failed to attach DQM service to shared memory";
00789   }
00790   catch (cms::Exception& e) {
00791     errmsg = "Failed to attach DQM service to shared memory: " + (std::string)e.what();
00792   }
00793   if (!errmsg.empty()) XCEPT_RAISE(evf::Exception,errmsg);
00794 }
00795 
00796 
00797 
00798 void FUEventProcessor::detachDqmFromShm() throw (evf::Exception)
00799 {
00800   std::string errmsg;
00801   bool success = false;
00802   try {
00803     edm::ServiceRegistry::Operate operate(evtProcessor_->getToken());
00804     if(edm::Service<FUShmDQMOutputService>().isAvailable())
00805       success = edm::Service<FUShmDQMOutputService>()->detachFromShm();
00806     if (!success) errmsg = "Failed to detach DQM service from shared memory";
00807   }
00808   catch (cms::Exception& e) {
00809     errmsg = "Failed to detach DQM service from shared memory: " + (std::string)e.what();
00810   }
00811   if (!errmsg.empty()) XCEPT_RAISE(evf::Exception,errmsg);
00812 }
00813 
00814 
00815 std::string FUEventProcessor::logsAsString()
00816 {
00817   std::ostringstream oss;
00818   if(logWrap_)
00819     {
00820       for(unsigned int i = logRingIndex_; i < logRing_.size(); i++)
00821         oss << logRing_[i] << std::endl;
00822       for(unsigned int i = 0; i <  logRingIndex_; i++)
00823         oss << logRing_[i] << std::endl;
00824     }
00825   else
00826       for(unsigned int i = logRingIndex_; i < logRing_.size(); i++)
00827         oss << logRing_[i] << std::endl;
00828     
00829   return oss.str();
00830 }
00831   
00832 void FUEventProcessor::localLog(std::string m)
00833 {
00834   timeval tv;
00835 
00836   gettimeofday(&tv,0);
00837   tm *uptm = localtime(&tv.tv_sec);
00838   char datestring[256];
00839   strftime(datestring, sizeof(datestring),"%c", uptm);
00840 
00841   if(logRingIndex_ == 0){logWrap_ = true; logRingIndex_ = logRingSize_;}
00842   logRingIndex_--;
00843   std::ostringstream timestamp;
00844   timestamp << " at " << datestring;
00845   m += timestamp.str();
00846   logRing_[logRingIndex_] = m;
00847 }
00848 
00849 void FUEventProcessor::startSupervisorLoop()
00850 {
00851   try {
00852     wlSupervising_=
00853       toolbox::task::getWorkLoopFactory()->getWorkLoop("Supervisor",
00854                                                        "waiting");
00855     if (!wlSupervising_->isActive()) wlSupervising_->activate();
00856     asSupervisor_ = toolbox::task::bind(this,&FUEventProcessor::supervisor,
00857                                         "Supervisor");
00858     wlSupervising_->submit(asSupervisor_);
00859     supervising_ = true;
00860   }
00861   catch (xcept::Exception& e) {
00862     std::string msg = "Failed to start workloop 'Supervisor'.";
00863     XCEPT_RETHROW(evf::Exception,msg,e);
00864   }
00865 }
00866 
00867 void FUEventProcessor::startReceivingLoop()
00868 {
00869   try {
00870     wlReceiving_=
00871       toolbox::task::getWorkLoopFactory()->getWorkLoop("Receiving",
00872                                                        "waiting");
00873     if (!wlReceiving_->isActive()) wlReceiving_->activate();
00874     asReceiveMsgAndExecute_ = toolbox::task::bind(this,&FUEventProcessor::receiving,
00875                                         "Receiving");
00876     wlReceiving_->submit(asReceiveMsgAndExecute_);
00877     receiving_ = true;
00878   }
00879   catch (xcept::Exception& e) {
00880     std::string msg = "Failed to start workloop 'Receiving'.";
00881     XCEPT_RETHROW(evf::Exception,msg,e);
00882   }
00883 }
00884 void FUEventProcessor::startReceivingMonitorLoop()
00885 {
00886   try {
00887     wlReceivingMonitor_=
00888       toolbox::task::getWorkLoopFactory()->getWorkLoop("ReceivingM",
00889                                                        "waiting");
00890     if (!wlReceivingMonitor_->isActive()) 
00891       wlReceivingMonitor_->activate();
00892     asReceiveMsgAndRead_ = 
00893       toolbox::task::bind(this,&FUEventProcessor::receivingAndMonitor,
00894                           "ReceivingM");
00895     wlReceivingMonitor_->submit(asReceiveMsgAndRead_);
00896     receivingM_ = true;
00897   }
00898   catch (xcept::Exception& e) {
00899     std::string msg = "Failed to start workloop 'ReceivingM'.";
00900     XCEPT_RETHROW(evf::Exception,msg,e);
00901   }
00902 }
00903 
00904 bool FUEventProcessor::receiving(toolbox::task::WorkLoop *)
00905 {
00906   MsgBuf msg;
00907   try{
00908     myProcess_->rcvSlave(msg,false); //will receive only messages from Master
00909     if(msg->mtype==MSQM_MESSAGE_TYPE_STOP)
00910       {
00911         pthread_mutex_lock(&stop_lock_);
00912         fsm_.fireEvent("Stop",this); // need to set state in fsm first to allow stopDone transition
00913         try{
00914           LOG4CPLUS_DEBUG(getApplicationLogger(),
00915                           "Trying to create message service presence ");
00916           edm::PresenceFactory *pf = edm::PresenceFactory::get();
00917           if(pf != 0) {
00918             pf->makePresence("MessageServicePresence").release();
00919           }
00920           else {
00921             LOG4CPLUS_ERROR(getApplicationLogger(),
00922                             "Unable to create message service presence ");
00923           }
00924         } 
00925         catch(...) {
00926           LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception");
00927         }
00928         stopClassic(); // call the normal sequence of stopping - as this is allowed to fail provisions must be made ...@@@EM
00929         MsgBuf msg1(0,MSQS_MESSAGE_TYPE_STOP);
00930         myProcess_->postSlave(msg1,false);
00931         pthread_mutex_unlock(&stop_lock_);
00932         fclose(stdout);
00933         fclose(stderr);
00934         _exit(EXIT_SUCCESS);
00935       }
00936     if(msg->mtype==MSQM_MESSAGE_TYPE_FSTOP)
00937       _exit(EXIT_SUCCESS);
00938   }
00939   catch(evf::Exception &e){}
00940   return true;
00941 }
00942 
00943 bool FUEventProcessor::supervisor(toolbox::task::WorkLoop *)
00944 {
00945   pthread_mutex_lock(&stop_lock_);
00946   if(subs_.size()!=nbSubProcesses_.value_)
00947     {
00948       subs_.resize(nbSubProcesses_.value_);
00949       spMStates_.resize(nbSubProcesses_.value_);
00950       spmStates_.resize(nbSubProcesses_.value_);
00951       for(unsigned int i = 0; i < spMStates_.size(); i++)
00952         {
00953           spMStates_[i] = edm::event_processor::sInit; 
00954           spmStates_[i] = 0; 
00955         }
00956     }
00957   bool running = fsm_.stateName()->toString()=="Enabled";
00958   bool stopping = fsm_.stateName()->toString()=="stopping";
00959   for(unsigned int i = 0; i < subs_.size(); i++)
00960     {
00961       if(subs_[i].alive()==-1000) continue;
00962       int sl;
00963 
00964       pid_t killedOrNot = waitpid(subs_[i].pid(),&sl,WNOHANG);
00965 
00966       if(killedOrNot==subs_[i].pid()) subs_[i].setStatus((WIFEXITED(sl) != 0 ? 0 : -1));
00967       else continue;
00968       pthread_mutex_lock(&pickup_lock_);
00969       std::ostringstream ost;
00970       if(subs_[i].alive()==0) ost << " process exited with status " << WEXITSTATUS(sl);
00971       else if(WIFSIGNALED(sl)!=0) ost << " process terminated with signal " << WTERMSIG(sl);
00972       else ost << " process stopped ";
00973       subs_[i].countdown()=slaveRestartDelaySecs_.value_;
00974       subs_[i].setReasonForFailed(ost.str());
00975       spMStates_[i] = evtProcessor_.notstarted_state_code();
00976       spmStates_[i] = 0;
00977       std::ostringstream ost1;
00978       ost1 << "-E- Slave " << subs_[i].pid() << ost.str();
00979       localLog(ost1.str());
00980       if(!autoRestartSlaves_.value_) subs_[i].disconnect();
00981       pthread_mutex_unlock(&pickup_lock_);
00982     }
00983   pthread_mutex_unlock(&stop_lock_);    
00984   if(stopping) return true; // if in stopping we are done
00985 
00986   if(running)
00987     {
00988       // if enabled, this loop will periodically check if dead slaves countdown has expired and restart them
00989       // this is only active while running, hence, the stop lock is acquired and only released at end of loop
00990       if(autoRestartSlaves_.value_){
00991         pthread_mutex_lock(&stop_lock_); //lockout slave killing at stop while you check for restarts
00992         for(unsigned int i = 0; i < subs_.size(); i++)
00993           {
00994             if(subs_[i].alive() != 1){
00995               if(subs_[i].countdown() == 0)
00996                 {
00997                   if(subs_[i].restartCount()>2){
00998                     LOG4CPLUS_WARN(getApplicationLogger()," Not restarting subprocess in slot " << i 
00999                                    << " - maximum restart count reached");
01000                     std::ostringstream ost1;
01001                     ost1 << "-W- Dead Process in slot " << i << " reached maximum restart count"; 
01002                     localLog(ost1.str());
01003                     subs_[i].countdown()--;
01004                     XCEPT_DECLARE(evf::Exception,
01005                                   sentinelException, ost1.str());
01006                     notifyQualified("error",sentinelException);
01007                     subs_[i].disconnect();
01008                     continue;
01009                   }
01010                   subs_[i].restartCount()++;
01011                   pid_t rr = subs_[i].forkNew();
01012                   if(rr==0)
01013                     {
01014                       myProcess_=&subs_[i];
01015                       scalersUpdates_ = 0;
01016                       int retval = pthread_mutex_destroy(&stop_lock_);
01017                       if(retval != 0) perror("error");
01018                       retval = pthread_mutex_init(&stop_lock_,0);
01019                       if(retval != 0) perror("error");
01020                       fsm_.disableRcmsStateNotification();
01021                       fsm_.fireEvent("Stop",this); // need to set state in fsm first to allow stopDone transition
01022                       fsm_.fireEvent("StopDone",this); // need to set state in fsm first to allow stopDone transition
01023                       fsm_.fireEvent("Enable",this); // need to set state in fsm first to allow stopDone transition
01024                       try{
01025                         xdata::Serializable *lsid = applicationInfoSpace_->find("lumiSectionIndex");
01026                         if(lsid) {
01027                           ((xdata::UnsignedInteger32*)(lsid))->value_--; // need to reset to value before end of ls in which process died
01028                         }
01029                       }
01030                       catch(...){
01031                         std::cout << "trouble with lsindex during restart" << std::endl;
01032                       }
01033                       try{
01034                         xdata::Serializable *lstb = applicationInfoSpace_->find("lsToBeRecovered");
01035                         if(lstb) {
01036                           ((xdata::Boolean*)(lstb))->value_ = false; // do not issue eol/bol for all Ls when restarting
01037                         }
01038                       }
01039                       catch(...){
01040                         std::cout << "trouble with resetting flag for eol recovery " << std::endl;
01041                       }
01042 
01043                       evtProcessor_.adjustLsIndexForRestart();
01044                       evtProcessor_.resetPackedTriggerReport();
01045                       enableMPEPSlave();
01046                       return false; // exit the supervisor loop immediately in the child !!!
01047                     }
01048                   else
01049                     {
01050                       std::ostringstream ost1;
01051                       ost1 << "-I- New Process " << rr << " forked for slot " << i; 
01052                       localLog(ost1.str());
01053                     }
01054                 }
01055               if(subs_[i].countdown()>=0) subs_[i].countdown()--;
01056             }
01057           }
01058         pthread_mutex_unlock(&stop_lock_);
01059       } // finished handling replacement of dead slaves once they've been reaped
01060     }
01061   xdata::Serializable *lsid = 0; 
01062   xdata::Serializable *psid = 0;
01063   xdata::Serializable *dqmp = 0;
01064   xdata::UnsignedInteger32 *dqm = 0;
01065 
01066 
01067   
01068   if(running){  
01069     try{
01070       lsid = applicationInfoSpace_->find("lumiSectionIndex");
01071       psid = applicationInfoSpace_->find("prescaleSetIndex");
01072       nbProcessed = monitorInfoSpace_->find("nbProcessed");
01073       nbAccepted  = monitorInfoSpace_->find("nbAccepted");
01074       dqmp = applicationInfoSpace_-> find("nbDqmUpdates");      
01075     }
01076     catch(xdata::exception::Exception e){
01077       LOG4CPLUS_INFO(getApplicationLogger(),"could not retrieve some data - " << e.what());    
01078     }
01079 
01080     try{
01081       if(nbProcessed !=0 && nbAccepted !=0)
01082         {
01083           xdata::UnsignedInteger32*nbp = ((xdata::UnsignedInteger32*)nbProcessed);
01084           xdata::UnsignedInteger32*nba = ((xdata::UnsignedInteger32*)nbAccepted);
01085           xdata::UnsignedInteger32*ls  = ((xdata::UnsignedInteger32*)lsid);
01086           xdata::UnsignedInteger32*ps  = ((xdata::UnsignedInteger32*)psid);
01087           if(dqmp!=0)
01088             dqm = (xdata::UnsignedInteger32*)dqmp;
01089           if(dqm) dqm->value_ = 0;
01090           nbTotalDQM_ = 0;
01091           nbp->value_ = 0;
01092           nba->value_ = 0;
01093           nblive_ = 0;
01094           nbdead_ = 0;
01095           scalersUpdates_ = 0;
01096 
01097           for(unsigned int i = 0; i < subs_.size(); i++)
01098             {
01099               if(subs_[i].alive()>0)
01100                 {
01101                   nblive_++;
01102                   try{
01103                     subs_[i].post(master_message_prg_,true);
01104                     
01105                     unsigned long retval = subs_[i].rcvNonBlocking(master_message_prr_,true);
01106                     if(retval == (unsigned long) master_message_prr_->mtype){
01107                       prg* p = (struct prg*)(master_message_prr_->mtext);
01108                       subs_[i].setParams(p);
01109                       spMStates_[i] = p->Ms;
01110                       spmStates_[i] = p->ms;
01111                       cpustat_->addEntry(p->ms);
01112                       if(!subs_[i].inInconsistentState() && 
01113                          (p->Ms == edm::event_processor::sError 
01114                           || p->Ms == edm::event_processor::sInvalid
01115                           || p->Ms == edm::event_processor::sStopping))
01116                         {
01117                           std::ostringstream ost;
01118                           ost << "edm::eventprocessor slot " << i << " process id " 
01119                               << subs_[i].pid() << " not in Running state : Mstate=" 
01120                               << evtProcessor_.stateNameFromIndex(p->Ms) << " mstate="
01121                               << evtProcessor_.moduleNameFromIndex(p->ms) 
01122                               << " - Look into possible error messages from HLT process";
01123                           LOG4CPLUS_WARN(getApplicationLogger(),ost.str());
01124                         }
01125                       nbp->value_ += subs_[i].params().nbp;
01126                       nba->value_  += subs_[i].params().nba;
01127                       if(dqm)dqm->value_ += p->dqm;
01128                       nbTotalDQM_ +=  p->dqm;
01129                       scalersUpdates_ += p->trp;
01130                       if(p->ls > ls->value_) ls->value_ = p->ls;
01131                       if(p->ps != ps->value_) ps->value_ = p->ps;
01132                     }
01133                     else{
01134                       nbp->value_ += subs_[i].get_save_nbp();
01135                       nba->value_ += subs_[i].get_save_nba();
01136                     }
01137                   } 
01138                   catch(evf::Exception &e){
01139                     LOG4CPLUS_INFO(getApplicationLogger(),
01140                                    "could not send/receive msg on slot " 
01141                                    << i << " - " << e.what());    
01142                   }
01143                     
01144                 }
01145               else
01146                 {
01147                   nbp->value_ += subs_[i].get_save_nbp();
01148                   nba->value_ += subs_[i].get_save_nba();
01149                   nbdead_++;
01150                 }
01151             }
01152           if(nbp->value_>64){//have some slaves already processed more than one event ? (eventually make this == number of raw cells)
01153             for(unsigned int i = 0; i < subs_.size(); i++)
01154               {
01155                 if(subs_[i].params().nbp == 0){ // a slave has processed 0 events 
01156                   // check that the process is not stuck
01157                   if(subs_[i].alive()>0 && subs_[i].params().ms == 0) // the process is seen alive but in us=Invalid(0)
01158                     {
01159                       subs_[i].found_invalid();//increase the "found_invalid" counter
01160                       if(subs_[i].nfound_invalid() > 60){ //wait x monitor cycles (~1 min a good time ?) before doing something about a stuck slave
01161                         MsgBuf msg3(NUMERIC_MESSAGE_SIZE,MSQM_MESSAGE_TYPE_FSTOP);      // send a force-stop signal             
01162                         subs_[i].post(msg3,false);
01163                         std::ostringstream ost1;
01164                         ost1 << "-W- Process in slot " << i << " Never reached the running state - forcestopping it"; 
01165                         localLog(ost1.str());
01166                         LOG4CPLUS_ERROR(getApplicationLogger(),ost1.str());    
01167                         XCEPT_DECLARE(evf::Exception,
01168                                       sentinelException, ost1.str());
01169                         notifyQualified("error",sentinelException);
01170 
01171                       }
01172                     }
01173                 }
01174               }
01175           }
01176         }
01177     }
01178     catch(std::exception &e){
01179       LOG4CPLUS_INFO(getApplicationLogger(),"std exception - " << e.what());    
01180     }
01181     catch(...){
01182       LOG4CPLUS_INFO(getApplicationLogger(),"unknown exception ");    
01183     }
01184   }
01185   else{
01186     for(unsigned int i = 0; i < subs_.size(); i++)
01187       {
01188         if(subs_[i].alive()==-1000)
01189           {
01190             spMStates_[i] = edm::event_processor::sInit;
01191             spmStates_[i] = 0;
01192           }
01193       }
01194   }
01195   try{
01196     monitorInfoSpace_->lock();
01197     monitorInfoSpace_->fireItemGroupChanged(names_,0);
01198     monitorInfoSpace_->unlock();
01199   }
01200   catch(xdata::exception::Exception &e)
01201     {
01202       LOG4CPLUS_ERROR(log_, "Exception from fireItemGroupChanged: " << e.what());
01203       //        localLog(e.what());
01204     }
01205   ::sleep(superSleepSec_.value_);       
01206   return true;
01207 }
01208 
01209 void FUEventProcessor::startScalersWorkLoop() throw (evf::Exception)
01210 {
01211   try {
01212     wlScalers_=
01213       toolbox::task::getWorkLoopFactory()->getWorkLoop("Scalers",
01214                                                        "waiting");
01215     if (!wlScalers_->isActive()) wlScalers_->activate();
01216     asScalers_ = toolbox::task::bind(this,&FUEventProcessor::scalers,
01217                                      "Scalers");
01218     
01219   wlScalers_->submit(asScalers_);
01220   wlScalersActive_ = true;
01221   }
01222   catch (xcept::Exception& e) {
01223     std::string msg = "Failed to start workloop 'Scalers'.";
01224     XCEPT_RETHROW(evf::Exception,msg,e);
01225   }
01226 }
01227 
01228 //______________________________________________________________________________
01229 
01230 void FUEventProcessor::startSummarizeWorkLoop() throw (evf::Exception)
01231 {
01232   try {
01233     wlSummarize_=
01234       toolbox::task::getWorkLoopFactory()->getWorkLoop("Summary",
01235                                                        "waiting");
01236     if (!wlSummarize_->isActive()) wlSummarize_->activate();
01237     
01238     asSummarize_ = toolbox::task::bind(this,&FUEventProcessor::summarize,
01239                                        "Summary");
01240 
01241     wlSummarize_->submit(asSummarize_);
01242     wlSummarizeActive_ = true;
01243   }
01244   catch (xcept::Exception& e) {
01245     std::string msg = "Failed to start workloop 'Summarize'.";
01246     XCEPT_RETHROW(evf::Exception,msg,e);
01247   }
01248 }
01249 
01250 //______________________________________________________________________________
01251 
01252 bool FUEventProcessor::scalers(toolbox::task::WorkLoop* wl)
01253 {
01254   if(evtProcessor_)
01255     {
01256       if(!evtProcessor_.getTriggerReport(true)) {
01257         wlScalersActive_ = false;
01258         return false;
01259       }
01260     }
01261   else
01262     {
01263       std::cout << getpid()<< " Scalers workloop, bailing out, no evtProcessor " << std::endl;
01264       wlScalersActive_ = false;
01265       return false;
01266     }
01267   if(myProcess_) 
01268     {
01269       //      std::cout << getpid() << "going to post on control queue from scalers" << std::endl;
01270       int ret = myProcess_->postSlave(evtProcessor_.getPackedTriggerReport(),false);
01271       if(ret!=0)      std::cout << "scalers workloop, error posting to sqs_ " << errno << std::endl;
01272       scalersUpdates_++;
01273     }
01274   else
01275     evtProcessor_.fireScalersUpdate();
01276   return true;
01277 }
01278 
01279 //______________________________________________________________________________
01280 bool FUEventProcessor::summarize(toolbox::task::WorkLoop* wl)
01281 {
01282   evtProcessor_.resetPackedTriggerReport();
01283   bool atLeastOneProcessUpdatedSuccessfully = false;
01284   int msgCount = 0;
01285   for (unsigned int i = 0; i < subs_.size(); i++)
01286     {
01287       if(subs_[i].alive()>0)
01288         {
01289           int ret = 0;
01290           if(subs_[i].check_postponed_trigger_update(master_message_trr_,
01291                                                      evtProcessor_.getLumiSectionReferenceIndex()))
01292             {
01293               ret = MSQS_MESSAGE_TYPE_TRR;
01294               std::cout << "using postponed report from slot " << i << " for ls " << evtProcessor_.getLumiSectionReferenceIndex() << std::endl;
01295             }
01296           else{
01297             bool insync = false;
01298             bool exception_caught = false;
01299             while(!insync){
01300               try{
01301                 ret = subs_[i].rcv(master_message_trr_,false);
01302               }
01303               catch(evf::Exception &e)
01304                 {
01305                   std::cout << "exception in msgrcv on " << i 
01306                             << " " << subs_[i].alive() << " " << strerror(errno) << std::endl;
01307                   exception_caught = true;
01308                   break;
01309                   //do nothing special
01310                 }
01311               if(ret==MSQS_MESSAGE_TYPE_TRR) {
01312                 TriggerReportStatic *trp = (TriggerReportStatic *)master_message_trr_->mtext;
01313                 if(trp->lumiSection >= evtProcessor_.getLumiSectionReferenceIndex()){
01314                   insync = true;
01315                 }
01316               }
01317             }
01318             if(exception_caught) continue;
01319           }
01320           msgCount++;
01321           if(ret==MSQS_MESSAGE_TYPE_TRR) {
01322             TriggerReportStatic *trp = (TriggerReportStatic *)master_message_trr_->mtext;
01323             if(trp->lumiSection > evtProcessor_.getLumiSectionReferenceIndex()){
01324               std::cout << "postpone handling of msg from slot " << i << " with Ls " <<  trp->lumiSection
01325                         << " should be " << evtProcessor_.getLumiSectionReferenceIndex() << std::endl;
01326               subs_[i].add_postponed_trigger_update(master_message_trr_);
01327             }else{
01328               atLeastOneProcessUpdatedSuccessfully = true;
01329               evtProcessor_.sumAndPackTriggerReport(master_message_trr_);
01330             }
01331           }
01332           else std::cout << "msgrcv returned error " << errno << std::endl;
01333         }
01334     }
01335   if(atLeastOneProcessUpdatedSuccessfully){
01336     nbSubProcessesReporting_.value_ = msgCount;
01337     evtProcessor_.getPackedTriggerReportAsStruct()->nbExpected = nbSubProcesses_.value_;
01338     evtProcessor_.getPackedTriggerReportAsStruct()->nbReporting = nbSubProcessesReporting_.value_;
01339     evtProcessor_.updateRollingReport();
01340     evtProcessor_.fireScalersUpdate();
01341   }
01342   else{
01343     LOG4CPLUS_WARN(getApplicationLogger(),"Summarize loop: no process updated successfully - sleep 10 seconds before trying again");          
01344     if(msgCount==0) evtProcessor_.withdrawLumiSectionIncrement();
01345     nbSubProcessesReporting_.value_ = 0;
01346     ::sleep(10);
01347   }
01348   if(fsm_.stateName()->toString()!="Enabled"){
01349     wlScalersActive_ = false;
01350     return false;
01351   }
01352   //  cpustat_->printStat();
01353   if(iDieStatisticsGathering_.value_){
01354     try{
01355       TriggerReportStatic *trsp = evtProcessor_.getPackedTriggerReportAsStruct();
01356       cpustat_ ->setNproc(trsp->eventSummary.totalEvents);
01357       cpustat_ ->sendStat(evtProcessor_.getLumiSectionReferenceIndex());
01358       ratestat_->sendStat((unsigned char*)trsp,
01359                           sizeof(TriggerReportStatic),
01360                           evtProcessor_.getLumiSectionReferenceIndex());
01361     }catch(evf::Exception &e){
01362       LOG4CPLUS_INFO(getApplicationLogger(),"coud not send statistics"
01363                      << e.what());
01364     }
01365   }
01366   cpustat_->reset();
01367   return true;
01368 }
01369 
01370 
01371 
01372 bool FUEventProcessor::receivingAndMonitor(toolbox::task::WorkLoop *)
01373 {
01374   try{
01375     myProcess_->rcvSlave(slave_message_monitoring_,true); //will receive only messages from Master
01376     switch(slave_message_monitoring_->mtype)
01377       {
01378       case MSQM_MESSAGE_TYPE_MCS:
01379         {
01380           xgi::Input *in = 0;
01381           xgi::Output out;
01382           evtProcessor_.microState(in,&out);
01383           MsgBuf msg1(out.str().size(),MSQS_MESSAGE_TYPE_MCR);
01384           strncpy(msg1->mtext,out.str().c_str(),out.str().size());
01385           myProcess_->postSlave(msg1,true);
01386           break;
01387         }
01388       
01389       case MSQM_MESSAGE_TYPE_PRG:
01390         {
01391           xdata::Serializable *dqmp = 0;
01392           xdata::UnsignedInteger32 *dqm = 0;
01393           evtProcessor_.monitoring(0);
01394           try{
01395             dqmp = applicationInfoSpace_-> find("nbDqmUpdates");
01396           }  catch(xdata::exception::Exception e){}
01397           if(dqmp!=0)
01398             dqm = (xdata::UnsignedInteger32*)dqmp;
01399 
01400           //      monitorInfoSpace_->lock();  
01401           prg * data           = (prg*)slave_message_prr_->mtext;
01402           data->ls             = evtProcessor_.lsid_;
01403           data->eols           = evtProcessor_.lastLumiUsingEol_;
01404           data->ps             = evtProcessor_.psid_;
01405           data->nbp            = evtProcessor_->totalEvents();
01406           data->nba            = evtProcessor_->totalEventsPassed();
01407           data->Ms             = evtProcessor_.epMAltState_.value_;
01408           data->ms             = evtProcessor_.epmAltState_.value_;
01409           if(dqm) data->dqm    = dqm->value_; else data->dqm = 0;
01410           data->trp            = scalersUpdates_;
01411           //      monitorInfoSpace_->unlock();  
01412           myProcess_->postSlave(slave_message_prr_,true);
01413           if(exitOnError_.value_)
01414           { 
01415             // after each monitoring cycle check if we are in inconsistent state and exit if configured to do so  
01416             //      std::cout << getpid() << "receivingAndMonitor: trying to acquire stop lock " << std::endl;
01417             if(data->Ms == edm::event_processor::sStopping || data->Ms == edm::event_processor::sError) 
01418               { 
01419                 bool running = true;
01420                 int count = 0;
01421                 while(running){
01422                   int retval = pthread_mutex_lock(&stop_lock_);
01423                   if(retval != 0) perror("error");
01424                   running = fsm_.stateName()->toString()=="Enabled";
01425                   if(count>5) _exit(-1);
01426                   pthread_mutex_unlock(&stop_lock_);
01427                   if(running) {::sleep(1); count++;}
01428                 }
01429               }
01430             
01431           }
01432           //      scalersUpdates_++;
01433           break;
01434         }
01435       case MSQM_MESSAGE_TYPE_WEB:
01436         {
01437           xgi::Input  *in = 0;
01438           xgi::Output out;
01439           unsigned int bytesToSend = 0;
01440           MsgBuf msg1(NUMERIC_MESSAGE_SIZE,MSQS_MESSAGE_TYPE_WEB);
01441           std::string query = slave_message_monitoring_->mtext;
01442           size_t pos = query.find_first_of("&");
01443           std::string method;
01444           std::string args;
01445           if(pos!=std::string::npos)  
01446             {
01447               method = query.substr(0,pos);
01448               args = query.substr(pos+1,query.length()-pos-1);
01449             }
01450           else
01451             method=query;
01452 
01453           if(method=="Spotlight")
01454             {
01455               spotlightWebPage(in,&out);
01456             }
01457           else if(method=="procStat")
01458             {
01459               procStat(in,&out);
01460             }
01461           else if(method=="moduleWeb")
01462             {
01463               internal::MyCgi mycgi;
01464               boost::char_separator<char> sep(";");
01465               boost::tokenizer<boost::char_separator<char> > tokens(args, sep);
01466               for (boost::tokenizer<boost::char_separator<char> >::iterator tok_iter = tokens.begin();
01467                    tok_iter != tokens.end(); ++tok_iter){
01468                 size_t pos = (*tok_iter).find_first_of("%");
01469                 if(pos != std::string::npos){
01470                   std::string first  = (*tok_iter).substr(0    ,                        pos);
01471                   std::string second = (*tok_iter).substr(pos+1, (*tok_iter).length()-pos-1);
01472                   mycgi.getEnvironment()[first]=second;
01473                 }
01474               }
01475               moduleWeb(&mycgi,&out);
01476             }
01477           else if(method=="Default")
01478             {
01479               defaultWebPage(in,&out);
01480             }
01481           else 
01482             {
01483               out << "Error 404!!!!!!!!" << std::endl;
01484             }
01485 
01486 
01487           bytesToSend = out.str().size();
01488           unsigned int cycle = 0;
01489           if(bytesToSend==0)
01490             {
01491               snprintf(msg1->mtext, NUMERIC_MESSAGE_SIZE, "%d", bytesToSend);
01492               myProcess_->postSlave(msg1,true);
01493             }
01494           while(bytesToSend !=0){
01495             unsigned int msgSize = bytesToSend>MAX_PIPE_BUFFER_SIZE ? MAX_PIPE_BUFFER_SIZE : bytesToSend;
01496             write(anonymousPipe_[PIPE_WRITE],
01497                   out.str().c_str()+MAX_PIPE_BUFFER_SIZE*cycle,
01498                   msgSize);
01499             snprintf(msg1->mtext, NUMERIC_MESSAGE_SIZE, "%d", msgSize);
01500             myProcess_->postSlave(msg1,true);
01501             bytesToSend -= msgSize;
01502             cycle++;
01503           }
01504           break;
01505         }
01506       case MSQM_MESSAGE_TYPE_TRP:
01507         {
01508           break;
01509         }
01510       }
01511   }
01512   catch(evf::Exception &e){std::cout << "exception caught in recevingM: " << e.what() << std::endl;}
01513   return true;
01514 }
01515 
01516 bool FUEventProcessor::enableCommon()
01517 {
01518   try {    
01519     if(hasShMem_) attachDqmToShm();
01520     int sc = 0;
01521     evtProcessor_->clearCounters();
01522     if(isRunNumberSetter_)
01523       evtProcessor_->setRunNumber(runNumber_.value_);
01524     else
01525       evtProcessor_->declareRunNumber(runNumber_.value_);
01526     try{
01527       ::sleep(1);
01528       evtProcessor_->runAsync();
01529       sc = evtProcessor_->statusAsync();
01530     }
01531     catch(cms::Exception &e) {
01532       reasonForFailedState_ = e.explainSelf();
01533       fsm_.fireFailed(reasonForFailedState_,this);
01534       localLog(reasonForFailedState_);
01535       return false;
01536     }    
01537     catch(std::exception &e) {
01538       reasonForFailedState_  = e.what();
01539       fsm_.fireFailed(reasonForFailedState_,this);
01540       localLog(reasonForFailedState_);
01541       return false;
01542     }
01543     catch(...) {
01544       reasonForFailedState_ = "Unknown Exception";
01545       fsm_.fireFailed(reasonForFailedState_,this);
01546       localLog(reasonForFailedState_);
01547       return false;
01548     }
01549     if(sc != 0) {
01550       std::ostringstream oss;
01551       oss<<"EventProcessor::runAsync returned status code " << sc;
01552       reasonForFailedState_ = oss.str();
01553       fsm_.fireFailed(reasonForFailedState_,this);
01554       localLog(reasonForFailedState_);
01555       return false;
01556     }
01557   }
01558   catch (xcept::Exception &e) {
01559     reasonForFailedState_ = "enabling FAILED: " + (std::string)e.what();
01560     fsm_.fireFailed(reasonForFailedState_,this);
01561     localLog(reasonForFailedState_);
01562     return false;
01563   }
01564   try{
01565     fsm_.fireEvent("EnableDone",this);
01566   }
01567   catch (xcept::Exception &e) {
01568     std::cout << "exception " << (std::string)e.what() << std::endl;
01569     throw;
01570   }
01571 
01572   return false;
01573 }
01574   
01575 bool FUEventProcessor::enableClassic()
01576 {
01577   bool retval = enableCommon();
01578   while(evtProcessor_->getState()!= edm::event_processor::sRunning){
01579     LOG4CPLUS_INFO(getApplicationLogger(),"waiting for edm::EventProcessor to start before enabling watchdog");
01580     ::sleep(1);
01581   }
01582   
01583   //  implementation moved to EPWrapper
01584   //  startScalersWorkLoop(); // this is now not done any longer 
01585   localLog("-I- Start completed");
01586   return retval;
01587 }
01588 bool FUEventProcessor::enableMPEPSlave()
01589 {
01590   //all this happens only in the child process
01591   startReceivingLoop();
01592   startReceivingMonitorLoop();
01593   evtProcessor_.resetWaiting();
01594   startScalersWorkLoop();
01595   while(!evtProcessor_.isWaitingForLs())
01596     ::sleep(1);
01597   // @EM test do not run monitor loop in slave, only receiving&Monitor
01598   //  evtProcessor_.startMonitoringWorkLoop();
01599   try{
01600     //    evtProcessor_.makeServicesOnly();
01601     try{
01602       LOG4CPLUS_DEBUG(getApplicationLogger(),
01603                       "Trying to create message service presence ");
01604       edm::PresenceFactory *pf = edm::PresenceFactory::get();
01605       if(pf != 0) {
01606         pf->makePresence("MessageServicePresence").release();
01607       }
01608       else {
01609         LOG4CPLUS_ERROR(getApplicationLogger(),
01610                         "Unable to create message service presence ");
01611       }
01612     } 
01613     catch(...) {
01614       LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception");
01615     }
01616   ML::MLlog4cplus::setAppl(this);      
01617   }       
01618   catch (xcept::Exception &e) {
01619     reasonForFailedState_ = "enabling FAILED: " + (std::string)e.what();
01620     fsm_.fireFailed(reasonForFailedState_,this);
01621     localLog(reasonForFailedState_);
01622   }
01623   bool retval =  enableCommon();
01624   //  while(evtProcessor_->getState()!= edm::event_processor::sRunning){
01625   //    LOG4CPLUS_INFO(getApplicationLogger(),"waiting for edm::EventProcessor to start before enabling watchdog");
01626   //    ::sleep(1);
01627   //  }
01628   return retval;
01629 }
01630 
01631 bool FUEventProcessor::stopClassic()
01632 {
01633   try {
01634     LOG4CPLUS_INFO(getApplicationLogger(),"Start stopping :) ...");
01635     edm::EventProcessor::StatusCode rc = evtProcessor_.stop();
01636     if(rc == edm::EventProcessor::epSuccess) 
01637       fsm_.fireEvent("StopDone",this);
01638     else
01639       {
01640         //      epMState_ = evtProcessor_->currentStateName();
01641         if(rc == edm::EventProcessor::epTimedOut)
01642           reasonForFailedState_ = "EventProcessor stop timed out";
01643         else
01644           reasonForFailedState_ = "EventProcessor did not receive STOP event";
01645         fsm_.fireFailed(reasonForFailedState_,this);
01646         localLog(reasonForFailedState_);
01647       }
01648     if(hasShMem_) detachDqmFromShm();
01649   }
01650   catch (xcept::Exception &e) {
01651     reasonForFailedState_ = "stopping FAILED: " + (std::string)e.what();
01652     localLog(reasonForFailedState_);
01653     fsm_.fireFailed(reasonForFailedState_,this);
01654   }
01655   LOG4CPLUS_INFO(getApplicationLogger(),"Finished stopping!");
01656   localLog("-I- Stop completed");
01657   return false;
01658 }
01659 
01660 void FUEventProcessor::stopSlavesAndAcknowledge()
01661 {
01662   MsgBuf msg(0,MSQM_MESSAGE_TYPE_STOP);
01663   MsgBuf msg1(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_STOP);
01664 
01665   std::vector<bool> processes_to_stop(nbSubProcesses_.value_,false);
01666   for(unsigned int i = 0; i < subs_.size(); i++)
01667     {
01668       pthread_mutex_lock(&stop_lock_);
01669       if(subs_[i].alive()>0){
01670         processes_to_stop[i] = true;
01671         subs_[i].post(msg,false);
01672       }
01673       pthread_mutex_unlock(&stop_lock_);
01674     }
01675   for(unsigned int i = 0; i < subs_.size(); i++)
01676     {
01677       pthread_mutex_lock(&stop_lock_);
01678       if(processes_to_stop[i]){
01679         try{
01680           subs_[i].rcv(msg1,false);
01681         }
01682         catch(evf::Exception &e){
01683           std::ostringstream ost;
01684           ost << "failed to get STOP - errno ->" << errno << " " << e.what(); 
01685           reasonForFailedState_ = ost.str();
01686           LOG4CPLUS_ERROR(getApplicationLogger(),reasonForFailedState_);
01687           //      fsm_.fireFailed(reasonForFailedState_,this);
01688           localLog(reasonForFailedState_);
01689           pthread_mutex_unlock(&stop_lock_);
01690           continue;
01691         }
01692       }
01693       else {
01694         pthread_mutex_unlock(&stop_lock_);
01695         continue;
01696       }
01697       pthread_mutex_unlock(&stop_lock_);
01698       if(msg1->mtype==MSQS_MESSAGE_TYPE_STOP)
01699         while(subs_[i].alive()>0) ::usleep(10000);
01700       subs_[i].disconnect();
01701     }
01702   //  subs_.clear();
01703 
01704 }
01705 
01706 void FUEventProcessor::microState(xgi::Input *in,xgi::Output *out)
01707 {
01708   std::string urn = getApplicationDescriptor()->getURN();
01709   try{
01710     evtProcessor_.stateNameFromIndex(0);
01711     evtProcessor_.moduleNameFromIndex(0);
01712   if(myProcess_) {std::cout << "microstate called for child! bail out" << std::endl; return;}
01713   *out << "<tr><td>" << fsm_.stateName()->toString() 
01714        << "</td><td>"<< (myProcess_ ? "S" : "M") <<"</td><td>" << nblive_ << "</td><td>"
01715        << nbdead_ << "</td><td><a href=\"/" << urn << "/procStat\">" << getpid() <<"</a></td>";
01716   evtProcessor_.microState(in,out);
01717   *out << "<td></td><td>" << nbTotalDQM_ 
01718        << "</td><td>" << evtProcessor_.getScalersUpdates() << "</td></tr>";
01719   if(nbSubProcesses_.value_!=0 && !myProcess_) 
01720     {
01721       pthread_mutex_lock(&start_lock_);
01722       for(unsigned int i = 0; i < subs_.size(); i++)
01723         {
01724           try{
01725             if(subs_[i].alive()>0)
01726               {
01727                 *out << "<tr><td  bgcolor=\"#00FF00\" id=\"a"
01728                      << i << "\">""Alive</td><td>S</td><td>"
01729                      << subs_[i].queueId() << "<td>" 
01730                      << subs_[i].queueStatus()<< "/"
01731                      << subs_[i].queueOccupancy() << "/"
01732                      << subs_[i].queuePidOfLastSend() << "/"
01733                      << subs_[i].queuePidOfLastReceive() 
01734                      << "</td><td><a id=\"p"<< i << "\" href=\"SubWeb?process=" 
01735                      << subs_[i].pid() << "&method=procStat\">" 
01736                      << subs_[i].pid()<<"</a></td>" //<< msg->mtext;
01737                      << "<td>" << evtProcessor_.stateNameFromIndex(subs_[i].params().Ms) << "</td><td>" 
01738                      << evtProcessor_.moduleNameFromIndex(subs_[i].params().ms) << "</td><td>" 
01739                      << subs_[i].params().nba << "/" << subs_[i].params().nbp 
01740                      << " (" << float(subs_[i].params().nba)/float(subs_[i].params().nbp)*100. <<"%)" 
01741                      << "</td><td>" << subs_[i].params().ls  << "/" << subs_[i].params().ls 
01742                      << "</td><td>" << subs_[i].params().ps 
01743                      << "</td><td" 
01744                      << ((subs_[i].params().eols<subs_[i].params().ls) ? " bgcolor=\"#00FF00\"" : " bgcolor=\"#FF0000\"")  << ">" 
01745                      << subs_[i].params().eols  
01746                      << "</td><td>" << subs_[i].params().dqm 
01747                      << "</td><td>" << subs_[i].params().trp << "</td>";
01748               }
01749             else 
01750               {
01751                 pthread_mutex_lock(&pickup_lock_);
01752                 *out << "<tr><td id=\"a"<< i << "\" ";
01753                 if(subs_[i].alive()==-1000)
01754                   *out << " bgcolor=\"#bbaabb\">NotInitialized";
01755                 else
01756                   *out << (subs_[i].alive()==0 ? ">Done" : " bgcolor=\"#FF0000\">Dead");
01757                 *out << "</td><td>S</td><td>"<< subs_[i].queueId() << "<td>" 
01758                      << subs_[i].queueStatus() << "/"
01759                      << subs_[i].queueOccupancy() << "/"
01760                      << subs_[i].queuePidOfLastSend() << "/"
01761                      << subs_[i].queuePidOfLastReceive() 
01762                      << "</td><td id=\"p"<< i << "\">"
01763                      <<subs_[i].pid()<<"</td><td colspan=\"5\">" << subs_[i].reasonForFailed();
01764                 if(subs_[i].alive()!=0 && subs_[i].alive()!=-1000) 
01765                   {
01766                     if(autoRestartSlaves_ && subs_[i].restartCount()<=2) 
01767                       *out << " will restart in " << subs_[i].countdown() << " s";
01768                     else if(autoRestartSlaves_)
01769                       *out << " reached maximum restart count";
01770                     else *out << " autoRestart is disabled ";
01771                   }
01772                 *out << "</td><td" 
01773                      << ((subs_[i].params().eols<subs_[i].params().ls) ? 
01774                          " bgcolor=\"#00FF00\"" : " bgcolor=\"#FF0000\"")  
01775                      << ">" 
01776                      << subs_[i].params().eols  
01777                      << "</td><td>" << subs_[i].params().dqm 
01778                      << "</td><td>" << subs_[i].params().trp << "</td>";
01779                 pthread_mutex_unlock(&pickup_lock_);
01780               }
01781             *out << "</tr>";
01782           }
01783           catch(evf::Exception &e){
01784             *out << "<tr><td id=\"a"<< i << "\" " 
01785                  <<"bgcolor=\"#FFFF00\">NotResp</td><td>S</td><td>"<< subs_[i].queueId() << "<td>" 
01786                  << subs_[i].queueStatus() << "/"
01787                  << subs_[i].queueOccupancy() << "/"
01788                  << subs_[i].queuePidOfLastSend() << "/"
01789                  << subs_[i].queuePidOfLastReceive() 
01790                  << "</td><td id=\"p"<< i << "\">"
01791                  <<subs_[i].pid()<<"</td>";
01792           }
01793         }
01794       pthread_mutex_unlock(&start_lock_); 
01795     }
01796   }
01797       catch(evf::Exception &e)
01798       {
01799         LOG4CPLUS_INFO(getApplicationLogger(),"evf::Exception caught in microstate - " << e.what());    
01800       }
01801     catch(cms::Exception &e)
01802       {
01803         LOG4CPLUS_INFO(getApplicationLogger(),"cms::Exception caught in microstate - " << e.what());    
01804       }
01805     catch(std::exception &e)
01806       {
01807         LOG4CPLUS_INFO(getApplicationLogger(),"std::Exception caught in microstate - " << e.what());    
01808       }
01809     catch(...)
01810       {
01811         LOG4CPLUS_INFO(getApplicationLogger(),"unknown exception caught in microstate - ");    
01812       }
01813 
01814 }
01815 
01816 
01817 void FUEventProcessor::updater(xgi::Input *in,xgi::Output *out)
01818 {
01819   using namespace utils;
01820 
01821   *out << updaterStatic_;
01822   mDiv(out,"loads");
01823   uptime(out);
01824   cDiv(out);
01825   mDiv(out,"st",fsm_.stateName()->toString());
01826   mDiv(out,"ru",runNumber_.toString());
01827   mDiv(out,"nsl",nbSubProcesses_.value_);
01828   mDiv(out,"nsr",nbSubProcessesReporting_.value_);
01829   mDiv(out,"cl");
01830   *out << getApplicationDescriptor()->getClassName() 
01831        << (nbSubProcesses_.value_ > 0 ? "MP " : " ");
01832   cDiv(out);
01833   mDiv(out,"in",getApplicationDescriptor()->getInstance());
01834   if(fsm_.stateName()->toString() != "Halted" && fsm_.stateName()->toString() != "halting"){
01835     mDiv(out,"hlt");
01836     *out << "<a href=\"" << configString_.toString() << "\">HLT Config</a>";
01837     cDiv(out);
01838     *out << std::endl;
01839   }
01840   else
01841     mDiv(out,"hlt","Not yet...");
01842 
01843   mDiv(out,"sq",squidPresent_.toString());
01844   mDiv(out,"vwl",(supervising_ ? "Active" : "Not Initialized"));
01845   mDiv(out,"mwl",evtProcessor_.wlMonitoring());
01846   if(nbProcessed != 0 && nbAccepted != 0)
01847     {
01848       mDiv(out,"tt",((xdata::UnsignedInteger32*)nbProcessed)->value_);
01849       mDiv(out,"ac",((xdata::UnsignedInteger32*)nbAccepted)->value_);
01850     }
01851   else
01852     {
01853       mDiv(out,"tt",0);
01854       mDiv(out,"ac",0);
01855     }
01856   if(!myProcess_)
01857     mDiv(out,"swl",(wlSummarizeActive_ ? "Active" : "Inactive"));
01858   else
01859     mDiv(out,"swl",(wlScalersActive_ ? "Active" : "Inactive"));
01860 
01861   mDiv(out,"idi",iDieUrl_.value_);
01862   if(vp_!=0){
01863     mDiv(out,"vpi",(unsigned int) vp_);
01864     if(vulture_->hasStarted()>=0)
01865       mDiv(out,"vul","Prowling");
01866     else
01867       mDiv(out,"vul","Dead");
01868   }
01869   else{
01870     mDiv(out,"vul",(vulture_==0 ? "Nope" : "Hatching"));
01871   }    
01872   if(evtProcessor_){
01873     mDiv(out,"ll");
01874     *out << evtProcessor_.lastLumi().ls
01875          << "," << evtProcessor_.lastLumi().proc << "," << evtProcessor_.lastLumi().acc;
01876     cDiv(out);
01877   }
01878   mDiv(out,"lg");
01879   for(unsigned int i = logRingIndex_; i<logRingSize_; i++)
01880     *out << logRing_[i] << std::endl;
01881   if(logWrap_)
01882     for(unsigned int i = 0; i<logRingIndex_; i++)
01883       *out << logRing_[i] << std::endl;
01884   cDiv(out);
01885   mDiv(out,"cha");
01886   if(cpustat_) *out << cpustat_->getChart();
01887   cDiv(out);
01888 }
01889 
01890 void FUEventProcessor::procStat(xgi::Input *in, xgi::Output *out)
01891 {
01892   evf::utils::procStat(out);
01893 }
01894 
01895 void FUEventProcessor::sendMessageOverMonitorQueue(MsgBuf &buf)
01896 {
01897   if(myProcess_) myProcess_->postSlave(buf,true);
01898 }
01899 
01900 void FUEventProcessor::makeStaticInfo()
01901 {
01902   using namespace utils;
01903   std::ostringstream ost;
01904   mDiv(&ost,"ve");
01905   ost<< "$Revision: 1.134 $ (" << edm::getReleaseVersion() <<")";
01906   cDiv(&ost);
01907   mDiv(&ost,"ou",outPut_.toString());
01908   mDiv(&ost,"sh",hasShMem_.toString());
01909   mDiv(&ost,"mw",hasModuleWebRegistry_.toString());
01910   mDiv(&ost,"sw",hasServiceWebRegistry_.toString());
01911   
01912   xdata::Serializable *monsleep = 0;
01913   xdata::Serializable *lstimeout = 0;
01914   try{
01915     monsleep  = applicationInfoSpace_->find("monSleepSec");
01916     lstimeout = applicationInfoSpace_->find("lsTimeOut");
01917   }
01918   catch(xdata::exception::Exception e){
01919   }
01920   
01921   if(monsleep!=0)
01922     mDiv(&ost,"ms",monsleep->toString());
01923   if(lstimeout!=0)
01924     mDiv(&ost,"lst",lstimeout->toString());
01925   char cbuf[sizeof(struct utsname)];
01926   struct utsname* buf = (struct utsname*)cbuf;
01927   uname(buf);
01928   mDiv(&ost,"sysinfo");
01929   ost << buf->sysname << " " << buf->nodename 
01930       << " " << buf->release << " " << buf->version << " " << buf->machine;
01931   cDiv(&ost);
01932   updaterStatic_ = ost.str();
01933 }
01934 
01935 XDAQ_INSTANTIATOR_IMPL(evf::FUEventProcessor)