CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_4_1_8_patch12/src/EventFilter/Processor/src/FUEventProcessor.cc

Go to the documentation of this file.
00001 
00002 //
00003 // FUEventProcessor
00004 // ----------------
00005 //
00007 
00008 #include "FUEventProcessor.h"
00009 #include "procUtils.h"
00010 #include "EventFilter/Utilities/interface/CPUStat.h"
00011 #include "EventFilter/Utilities/interface/RateStat.h"
00012 
00013 #include "EventFilter/Utilities/interface/Exception.h"
00014 
00015 #include "EventFilter/Message2log4cplus/interface/MLlog4cplus.h"
00016 #include "EventFilter/Modules/interface/FUShmDQMOutputService.h"
00017 #include "EventFilter/Utilities/interface/ServiceWebRegistry.h"
00018 #include "EventFilter/Utilities/interface/ServiceWeb.h"
00019 
00020 
00021 #include "FWCore/PluginManager/interface/ProblemTracker.h"
00022 #include "FWCore/PluginManager/interface/PresenceFactory.h"
00023 #include "FWCore/Utilities/interface/Presence.h"
00024 #include "FWCore/Utilities/interface/Exception.h"
00025 #include "FWCore/Version/interface/GetReleaseVersion.h"
00026 #include "FWCore/ServiceRegistry/interface/Service.h"
00027 
00028 #include <boost/tokenizer.hpp>
00029 
00030 #include "xcept/tools.h"
00031 #include "xgi/Method.h"
00032 
00033 #include "cgicc/CgiDefs.h"
00034 #include "cgicc/Cgicc.h"
00035 #include "cgicc/FormEntry.h"
00036 
00037 
00038 #include <sys/wait.h>
00039 #include <sys/utsname.h>
00040 
00041 #include <typeinfo>
00042 #include <stdlib.h>
00043 #include <stdio.h>
00044 #include <errno.h>
00045 
00046 using namespace evf;
00047 using namespace cgicc;
00048 
00049 
00051 // construction/destruction
00053 
00054 //______________________________________________________________________________
00055 FUEventProcessor::FUEventProcessor(xdaq::ApplicationStub *s) 
00056   : xdaq::Application(s)
00057   , fsm_(this)
00058   , log_(getApplicationLogger())
00059   , evtProcessor_(log_, getApplicationDescriptor()->getInstance())
00060   , runNumber_(0)
00061   , epInitialized_(false)
00062   , outPut_(true)
00063   , autoRestartSlaves_(false)
00064   , slaveRestartDelaySecs_(10)
00065   , hasShMem_(true)
00066   , hasPrescaleService_(true)
00067   , hasModuleWebRegistry_(true)
00068   , hasServiceWebRegistry_(true)
00069   , isRunNumberSetter_(true)
00070   , iDieStatisticsGathering_(false)
00071   , outprev_(true)
00072   , exitOnError_(true)
00073   , reasonForFailedState_()
00074   , squidnet_(3128,"http://localhost:8000/RELEASE-NOTES.txt")
00075   , logRing_(logRingSize_)
00076   , logRingIndex_(logRingSize_)
00077   , logWrap_(false)
00078   , nbSubProcesses_(0)
00079   , nbSubProcessesReporting_(0)
00080   , nblive_(0)
00081   , nbdead_(0)
00082   , nbTotalDQM_(0)
00083   , wlReceiving_(0)
00084   , asReceiveMsgAndExecute_(0)
00085   , receiving_(false) 
00086   , wlReceivingMonitor_(0)
00087   , asReceiveMsgAndRead_(0)
00088   , receivingM_(false)
00089   , myProcess_(0)
00090   , wlSupervising_(0)
00091   , asSupervisor_(0)
00092   , supervising_(false)
00093   , monitorInfoSpace_(0)
00094   , monitorLegendaInfoSpace_(0)
00095   , applicationInfoSpace_(0)
00096   , nbProcessed(0)
00097   , nbAccepted(0)
00098   , scalersInfoSpace_(0)
00099   , scalersLegendaInfoSpace_(0)
00100   , wlScalers_(0)
00101   , asScalers_(0)
00102   , wlScalersActive_(false)
00103   , scalersUpdates_(0)
00104   , wlSummarize_(0)
00105   , asSummarize_(0)
00106   , wlSummarizeActive_(false)
00107   , superSleepSec_(1)
00108   , iDieUrl_("none")
00109   , vulture_(0)
00110   , vp_(0)
00111   , cpustat_(0)
00112   , ratestat_(0)
00113 {
00114   using namespace utils;
00115 
00116   names_.push_back("nbProcessed"    );
00117   names_.push_back("nbAccepted"     );
00118   names_.push_back("epMacroStateInt");
00119   names_.push_back("epMicroStateInt");
00120   // create pipe for web communication
00121   int retpipe = pipe(anonymousPipe_);
00122   if(retpipe != 0)
00123         LOG4CPLUS_ERROR(getApplicationLogger(),"Failed to create pipe");
00124   // check squid presence
00125   squidPresent_ = squidnet_.check();
00126   //pass application parameters to FWEPWrapper
00127   evtProcessor_.setAppDesc(getApplicationDescriptor());
00128   evtProcessor_.setAppCtxt(getApplicationContext());
00129   // bind relevant callbacks to finite state machine
00130   fsm_.initialize<evf::FUEventProcessor>(this);
00131   
00132   //set sourceId_
00133   url_ =
00134     getApplicationDescriptor()->getContextDescriptor()->getURL()+"/"+
00135     getApplicationDescriptor()->getURN();
00136   class_   =getApplicationDescriptor()->getClassName();
00137   instance_=getApplicationDescriptor()->getInstance();
00138   sourceId_=class_.toString()+"_"+instance_.toString();
00139   LOG4CPLUS_INFO(getApplicationLogger(),sourceId_ <<" constructor"         );
00140   LOG4CPLUS_INFO(getApplicationLogger(),"CMSSW_BASE:"<<getenv("CMSSW_BASE"));
00141   
00142   getApplicationDescriptor()->setAttribute("icon", "/evf/images/epicon.jpg");
00143   
00144   xdata::InfoSpace *ispace = getApplicationInfoSpace();
00145   applicationInfoSpace_ = ispace;
00146 
00147   // default configuration
00148   ispace->fireItemAvailable("parameterSet",         &configString_                );
00149   ispace->fireItemAvailable("epInitialized",        &epInitialized_               );
00150   ispace->fireItemAvailable("stateName",             fsm_.stateName()             );
00151   ispace->fireItemAvailable("runNumber",            &runNumber_                   );
00152   ispace->fireItemAvailable("outputEnabled",        &outPut_                      );
00153 
00154   ispace->fireItemAvailable("hasSharedMemory",      &hasShMem_);
00155   ispace->fireItemAvailable("hasPrescaleService",   &hasPrescaleService_          );
00156   ispace->fireItemAvailable("hasModuleWebRegistry", &hasModuleWebRegistry_        );
00157   ispace->fireItemAvailable("hasServiceWebRegistry", &hasServiceWebRegistry_      );
00158   ispace->fireItemAvailable("isRunNumberSetter",    &isRunNumberSetter_           );
00159   ispace->fireItemAvailable("iDieStatisticsGathering",   &iDieStatisticsGathering_);
00160   ispace->fireItemAvailable("rcmsStateListener",     fsm_.rcmsStateListener()     );
00161   ispace->fireItemAvailable("foundRcmsStateListener",fsm_.foundRcmsStateListener());
00162   ispace->fireItemAvailable("nbSubProcesses",       &nbSubProcesses_              );
00163   ispace->fireItemAvailable("nbSubProcessesReporting",&nbSubProcessesReporting_   );
00164   ispace->fireItemAvailable("superSleepSec",        &superSleepSec_               );
00165   ispace->fireItemAvailable("autoRestartSlaves",    &autoRestartSlaves_           );
00166   ispace->fireItemAvailable("slaveRestartDelaySecs",&slaveRestartDelaySecs_       );
00167   ispace->fireItemAvailable("iDieUrl",              &iDieUrl_                     );
00168   
00169   // Add infospace listeners for exporting data values
00170   getApplicationInfoSpace()->addItemChangedListener("parameterSet",        this);
00171   getApplicationInfoSpace()->addItemChangedListener("outputEnabled",       this);
00172 
00173   // findRcmsStateListener
00174   fsm_.findRcmsStateListener();
00175   
00176   // initialize monitoring infospace
00177 
00178   std::string monInfoSpaceName="evf-eventprocessor-status-monitor";
00179   toolbox::net::URN urn = this->createQualifiedInfoSpace(monInfoSpaceName);
00180   monitorInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
00181 
00182   std::string monLegendaInfoSpaceName="evf-eventprocessor-status-legenda";
00183   urn = this->createQualifiedInfoSpace(monLegendaInfoSpaceName);
00184   monitorLegendaInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
00185 
00186   
00187   monitorInfoSpace_->fireItemAvailable("url",                      &url_            );
00188   monitorInfoSpace_->fireItemAvailable("class",                    &class_          );
00189   monitorInfoSpace_->fireItemAvailable("instance",                 &instance_       );
00190   monitorInfoSpace_->fireItemAvailable("runNumber",                &runNumber_      );
00191   monitorInfoSpace_->fireItemAvailable("stateName",                 fsm_.stateName()); 
00192 
00193   monitorInfoSpace_->fireItemAvailable("squidPresent",             &squidPresent_   );
00194 
00195   std::string scalersInfoSpaceName="evf-eventprocessor-scalers-monitor";
00196   urn = this->createQualifiedInfoSpace(scalersInfoSpaceName);
00197   scalersInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
00198 
00199   std::string scalersLegendaInfoSpaceName="evf-eventprocessor-scalers-legenda";
00200   urn = this->createQualifiedInfoSpace(scalersLegendaInfoSpaceName);
00201   scalersLegendaInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
00202 
00203 
00204 
00205   evtProcessor_.setScalersInfoSpace(scalersInfoSpace_,scalersLegendaInfoSpace_);
00206   scalersInfoSpace_->fireItemAvailable("instance", &instance_);
00207 
00208   evtProcessor_.setApplicationInfoSpace(ispace);
00209   evtProcessor_.setMonitorInfoSpace(monitorInfoSpace_,monitorLegendaInfoSpace_);
00210   evtProcessor_.publishConfigAndMonitorItems(nbSubProcesses_.value_!=0);
00211 
00212   //subprocess state vectors for MP
00213   monitorInfoSpace_->fireItemAvailable("epMacroStateInt",             &spMStates_); 
00214   monitorInfoSpace_->fireItemAvailable("epMicroStateInt",             &spmStates_); 
00215   
00216   // Bind web interface
00217   xgi::bind(this, &FUEventProcessor::css,              "styles.css");
00218   xgi::bind(this, &FUEventProcessor::defaultWebPage,   "Default"   );
00219   xgi::bind(this, &FUEventProcessor::spotlightWebPage, "Spotlight" );
00220   xgi::bind(this, &FUEventProcessor::scalersWeb,       "scalersWeb");
00221   xgi::bind(this, &FUEventProcessor::pathNames,        "pathNames" );
00222   xgi::bind(this, &FUEventProcessor::subWeb,           "SubWeb"    );
00223   xgi::bind(this, &FUEventProcessor::moduleWeb,        "moduleWeb" );
00224   xgi::bind(this, &FUEventProcessor::serviceWeb,       "serviceWeb");
00225   xgi::bind(this, &FUEventProcessor::microState,       "microState");
00226   xgi::bind(this, &FUEventProcessor::updater,          "updater"   );
00227   xgi::bind(this, &FUEventProcessor::procStat,         "procStat"  );
00228 
00229   // instantiate the plugin manager, not referenced here after!
00230 
00231   edm::AssertHandler ah;
00232 
00233   try{
00234     LOG4CPLUS_DEBUG(getApplicationLogger(),
00235                     "Trying to create message service presence ");
00236     edm::PresenceFactory *pf = edm::PresenceFactory::get();
00237     if(pf != 0) {
00238       pf->makePresence("MessageServicePresence").release();
00239     }
00240     else {
00241       LOG4CPLUS_ERROR(getApplicationLogger(),
00242                       "Unable to create message service presence ");
00243     }
00244   } 
00245   catch(...) {
00246     LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception");
00247   }
00248   ML::MLlog4cplus::setAppl(this);      
00249 
00250   typedef std::set<xdaq::ApplicationDescriptor*> AppDescSet_t;
00251   typedef AppDescSet_t::iterator                 AppDescIter_t;
00252   
00253   AppDescSet_t rcms=
00254     getApplicationContext()->getDefaultZone()->
00255     getApplicationDescriptors("RCMSStateListener");
00256   if(rcms.size()==0) 
00257     {
00258       LOG4CPLUS_WARN(getApplicationLogger(),
00259                        "MonitorReceiver not found, perhaphs it has not been defined ? Scalers updater wl will bail out!");
00260       //        localLog("-W- MonitorReceiver not found, perhaphs it has not been defined ? Scalers updater wl will bail out!");
00261     }
00262   else
00263     {
00264       AppDescIter_t it = rcms.begin();
00265       evtProcessor_.setRcms(*it);
00266     }
00267   pthread_mutex_init(&start_lock_,0);
00268   pthread_mutex_init(&stop_lock_,0);
00269   pthread_mutex_init(&pickup_lock_,0);
00270 
00271   makeStaticInfo();
00272   startSupervisorLoop();  
00273 
00274   if(vulture_==0) vulture_ = new Vulture(true);
00275 
00277 
00278   AppDescSet_t setOfiDie=
00279     getApplicationContext()->getDefaultZone()->
00280     getApplicationDescriptors("evf::iDie");
00281   
00282   for (AppDescIter_t it=setOfiDie.begin();it!=setOfiDie.end();++it)
00283     if ((*it)->getInstance()==0) // there has to be only one instance of iDie
00284       iDieUrl_ = (*it)->getContextDescriptor()->getURL() + "/" + (*it)->getURN();
00285 }
00286 //___________here ends the *huge* constructor___________________________________
00287 
00288 
00289 //______________________________________________________________________________
00290 FUEventProcessor::~FUEventProcessor()
00291 {
00292   // no longer needed since the wrapper is a member of the class and one can rely on 
00293   // implicit destructor - to be revised - at any rate the most common exit path is via "kill"...
00294   //  if (evtProcessor_) delete evtProcessor_;
00295   if(vulture_ != 0) delete vulture_;
00296 }
00297 
00298 
00300 // implementation of member functions
00302 
00303 
00304 //______________________________________________________________________________
00305 bool FUEventProcessor::configuring(toolbox::task::WorkLoop* wl)
00306 {
00307 //   std::cout << "values " << ((nbSubProcesses_.value_!=0) ? 0x10 : 0) << " "
00308 //          << ((instance_.value_==0) ? 0x8 : 0) << " "
00309 //          << (hasServiceWebRegistry_.value_ ? 0x4 : 0) << " "
00310 //          << (hasModuleWebRegistry_.value_ ? 0x2 : 0) << " "
00311 //          << (hasPrescaleService_.value_ ? 0x1 : 0) <<std::endl;
00312   unsigned short smap 
00313     = ((nbSubProcesses_.value_!=0) ? 0x10 : 0)
00314     + ((instance_.value_==0) ? 0x8 : 0)
00315     + (hasServiceWebRegistry_.value_ ? 0x4 : 0) 
00316     + (hasModuleWebRegistry_.value_ ? 0x2 : 0) 
00317     + (hasPrescaleService_.value_ ? 0x1 : 0);
00318   if(nbSubProcesses_.value_==0) 
00319     {
00320       spMStates_.setSize(1); 
00321       spmStates_.setSize(1); 
00322     }
00323   else
00324     {
00325       spMStates_.setSize(nbSubProcesses_.value_);
00326       spmStates_.setSize(nbSubProcesses_.value_);
00327       for(unsigned int i = 0; i < spMStates_.size(); i++)
00328         {
00329           spMStates_[i] = edm::event_processor::sInit; 
00330           spmStates_[i] = 0; 
00331         }
00332     }
00333   try {
00334     LOG4CPLUS_INFO(getApplicationLogger(),"Start configuring ...");
00335     std::string cfg = configString_.toString(); evtProcessor_.init(smap,cfg);
00336     epInitialized_=true;
00337     if(evtProcessor_)
00338       {
00339         // moved to wrapper class
00340         configuration_ = evtProcessor_.configuration();
00341         if(nbSubProcesses_.value_==0) evtProcessor_.startMonitoringWorkLoop(); 
00342         evtProcessor_->beginJob(); 
00343         if(cpustat_) {delete cpustat_; cpustat_=0;}
00344         cpustat_ = new CPUStat(evtProcessor_.getNumberOfMicrostates(),
00345                                iDieUrl_.value_);
00346         if(ratestat_) {delete ratestat_; ratestat_=0;}
00347         ratestat_ = new RateStat(iDieUrl_.value_);
00348         if(iDieStatisticsGathering_.value_){
00349           try{
00350             cpustat_->sendLegenda(evtProcessor_.getmicromap());
00351             xdata::Serializable *legenda = scalersLegendaInfoSpace_->find("scalersLegenda");
00352             if(legenda !=0){
00353               std::string slegenda = ((xdata::String*)legenda)->value_;
00354               ratestat_->sendLegenda(slegenda);
00355             }
00356 
00357           }
00358           catch(evf::Exception &e){
00359             LOG4CPLUS_INFO(getApplicationLogger(),"coud not send legenda"
00360                            << e.what());
00361           }
00362         }
00363         
00364         fsm_.fireEvent("ConfigureDone",this);
00365         LOG4CPLUS_INFO(getApplicationLogger(),"Finished configuring!");
00366         localLog("-I- Configuration completed");
00367 
00368       }
00369   }
00370   catch (xcept::Exception &e) {
00371     reasonForFailedState_ = "configuring FAILED: " + (std::string)e.what();
00372     fsm_.fireFailed(reasonForFailedState_,this);
00373     localLog(reasonForFailedState_);
00374   }
00375   catch(cms::Exception &e) {
00376     reasonForFailedState_ = e.explainSelf();
00377     fsm_.fireFailed(reasonForFailedState_,this);
00378     localLog(reasonForFailedState_);
00379   }    
00380   catch(std::exception &e) {
00381     reasonForFailedState_ = e.what();
00382     fsm_.fireFailed(reasonForFailedState_,this);
00383     localLog(reasonForFailedState_);
00384   }
00385   catch(...) {
00386     fsm_.fireFailed("Unknown Exception",this);
00387   }
00388 
00389   if(vulture_!=0 && vp_ == 0) vp_ = vulture_->makeProcess();
00390 
00391   return false;
00392 }
00393 
00394 
00395 
00396 
00397 //______________________________________________________________________________
00398 bool FUEventProcessor::enabling(toolbox::task::WorkLoop* wl)
00399 {
00400   nbTotalDQM_ = 0;
00401   scalersUpdates_ = 0;
00402 //   std::cout << "values " << ((nbSubProcesses_.value_!=0) ? 0x10 : 0) << " "
00403 //          << ((instance_.value_==0) ? 0x8 : 0) << " "
00404 //          << (hasServiceWebRegistry_.value_ ? 0x4 : 0) << " "
00405 //          << (hasModuleWebRegistry_.value_ ? 0x2 : 0) << " "
00406 //          << (hasPrescaleService_.value_ ? 0x1 : 0) <<std::endl;
00407   unsigned short smap 
00408     = ((nbSubProcesses_.value_!=0) ? 0x10 : 0)
00409     + ((instance_.value_==0) ? 0x8 : 0)
00410     + (hasServiceWebRegistry_.value_ ? 0x4 : 0) 
00411     + (hasModuleWebRegistry_.value_ ? 0x2 : 0) 
00412     + (hasPrescaleService_.value_ ? 0x1 : 0);
00413 
00414   LOG4CPLUS_INFO(getApplicationLogger(),"Start enabling...");
00415   if(!epInitialized_){
00416     evtProcessor_.forceInitEventProcessorMaybe();
00417   }
00418   std::string cfg = configString_.toString(); evtProcessor_.init(smap,cfg);
00419 
00420   if(!epInitialized_){
00421     evtProcessor_->beginJob(); 
00422     if(cpustat_) {delete cpustat_; cpustat_=0;}
00423     cpustat_ = new CPUStat(evtProcessor_.getNumberOfMicrostates(),
00424                            iDieUrl_.value_);
00425     if(iDieStatisticsGathering_.value_){
00426       try{
00427         cpustat_->sendLegenda(evtProcessor_.getmicromap());
00428         xdata::Serializable *legenda = scalersInfoSpace_->find("scalersLegenda");
00429         if(legenda !=0){
00430           std::string slegenda = ((xdata::String*)legenda)->value_;
00431           ratestat_->sendLegenda(slegenda);
00432        }
00433       }
00434       catch(evf::Exception &e){
00435         LOG4CPLUS_INFO(getApplicationLogger(),"coud not send legenda"
00436                        << e.what());
00437       }
00438     }
00439     if(ratestat_) {delete ratestat_; ratestat_=0;}
00440     ratestat_ = new RateStat(iDieUrl_.value_);
00441     epInitialized_ = true;
00442   }
00443   configuration_ = evtProcessor_.configuration(); // get it again after init has been carried out...
00444   evtProcessor_.resetLumiSectionReferenceIndex();
00445   //classic appl will return here 
00446   if(nbSubProcesses_.value_==0) return enableClassic();
00447   //protect manipulation of subprocess array
00448   pthread_mutex_lock(&start_lock_);
00449 //   subs_.clear();
00450 //   subs_.resize(nbSubProcesses_.value_); // this should not be necessary
00451   pid_t retval = -1;
00452   for(unsigned int i=0; i<nbSubProcesses_.value_; i++)
00453     {
00454       subs_[i]=SubProcess(i,retval); //this will replace all the scattered variables
00455     }
00456   pthread_mutex_unlock(&start_lock_);
00457 
00458   for(unsigned int i=0; i<nbSubProcesses_.value_; i++)
00459     {
00460       retval = subs_[i].forkNew();
00461       if(retval==0)
00462         {
00463           myProcess_ = &subs_[i];
00464           int retval = pthread_mutex_destroy(&stop_lock_);
00465           if(retval != 0) perror("error");
00466           retval = pthread_mutex_init(&stop_lock_,0);
00467           if(retval != 0) perror("error");
00468           fsm_.disableRcmsStateNotification();
00469           return enableMPEPSlave();
00470           // the loop is broken in the child 
00471         }
00472     }
00473   
00474   startSummarizeWorkLoop();
00475   vp_ = vulture_->start(iDieUrl_.value_,runNumber_.value_);
00476 
00477   LOG4CPLUS_INFO(getApplicationLogger(),"Finished enabling!");
00478   fsm_.fireEvent("EnableDone",this);
00479   localLog("-I- Start completed");
00480   return false;
00481 }
00482 
00483 
00484 //______________________________________________________________________________
00485 bool FUEventProcessor::stopping(toolbox::task::WorkLoop* wl)
00486 {
00487   if(nbSubProcesses_.value_!=0) 
00488     stopSlavesAndAcknowledge();
00489   vulture_->stop();
00490   return stopClassic();
00491 }
00492 
00493 
00494 //______________________________________________________________________________
00495 bool FUEventProcessor::halting(toolbox::task::WorkLoop* wl)
00496 {
00497   LOG4CPLUS_INFO(getApplicationLogger(),"Start halting ...");
00498   if(nbSubProcesses_.value_!=0) 
00499     stopSlavesAndAcknowledge();
00500   try{
00501     evtProcessor_.stopAndHalt();
00502   }
00503   catch (evf::Exception &e) {
00504     reasonForFailedState_ = "halting FAILED: " + (std::string)e.what();
00505     localLog(reasonForFailedState_);
00506     fsm_.fireFailed(reasonForFailedState_,this);
00507   }
00508   //  if(hasShMem_) detachDqmFromShm();
00509 
00510   LOG4CPLUS_INFO(getApplicationLogger(),"Finished halting!");
00511   fsm_.fireEvent("HaltDone",this);
00512 
00513   localLog("-I- Halt completed");
00514   return false;
00515 }
00516 
00517 
00518 //______________________________________________________________________________
00519 xoap::MessageReference FUEventProcessor::fsmCallback(xoap::MessageReference msg)
00520   throw (xoap::exception::Exception)
00521 {
00522   return fsm_.commandCallback(msg);
00523 }
00524 
00525 
00526 //______________________________________________________________________________
00527 void FUEventProcessor::actionPerformed(xdata::Event& e)
00528 {
00529 
00530   if (e.type()=="ItemChangedEvent" && fsm_.stateName()->toString()!="Halted") {
00531     std::string item = dynamic_cast<xdata::ItemChangedEvent&>(e).itemName();
00532     
00533     if ( item == "parameterSet") {
00534       LOG4CPLUS_WARN(getApplicationLogger(),
00535                      "HLT Menu changed, force reinitialization of EventProcessor");
00536       epInitialized_ = false;
00537     }
00538     if ( item == "outputEnabled") {
00539       if(outprev_ != outPut_) {
00540         LOG4CPLUS_WARN(getApplicationLogger(),
00541                        (outprev_ ? "Disabling " : "Enabling ")<<"global output");
00542         evtProcessor_->enableEndPaths(outPut_);
00543         outprev_ = outPut_;
00544       }
00545     }
00546     if (item == "globalInputPrescale") {
00547       LOG4CPLUS_WARN(getApplicationLogger(),
00548                      "Setting global input prescale has no effect "
00549                      <<"in this version of the code");
00550     }
00551     if ( item == "globalOutputPrescale") {
00552       LOG4CPLUS_WARN(getApplicationLogger(),
00553                      "Setting global output prescale has no effect "
00554                      <<"in this version of the code");
00555     }
00556   }
00557   
00558 }
00559 
00560 //______________________________________________________________________________
00561 void FUEventProcessor::subWeb(xgi::Input  *in, xgi::Output *out)
00562 {
00563   using namespace cgicc;
00564   pid_t pid = 0;
00565   std::ostringstream ost;
00566   ost << "&";
00567 
00568   Cgicc cgi(in);
00569   internal::MyCgi *mycgi = (internal::MyCgi*)in;
00570   for(std::map<std::string, std::string, std::less<std::string> >::iterator mit = 
00571         mycgi->getEnvironment().begin();
00572       mit != mycgi->getEnvironment().end(); mit++)
00573     ost << mit->first << "%" << mit->second << ";";
00574   std::vector<FormEntry> els = cgi.getElements() ;
00575   std::vector<FormEntry> el1;
00576   cgi.getElement("method",el1);
00577   std::vector<FormEntry> el2;
00578   cgi.getElement("process",el2);
00579   if(el1.size()!=0) {
00580     std::string meth = el1[0].getValue();
00581     if(el2.size()!=0) {
00582       unsigned int i = 0;
00583       std::string mod = el2[0].getValue();
00584       pid = atoi(mod.c_str()); // get the process id to be polled
00585       for(; i < subs_.size(); i++)
00586         if(subs_[i].pid()==pid) break;
00587       if(i>=subs_.size()){ //process was not found, let the browser know
00588         *out << "ERROR 404 : Process " << pid << " Not Found !" << std::endl;
00589         return;
00590       } 
00591       if(subs_[i].alive() != 1){
00592         *out << "ERROR 405 : Process " << pid << " Not Alive !" << std::endl;
00593         return;
00594       }
00595       MsgBuf msg1(meth.length()+ost.str().length(),MSQM_MESSAGE_TYPE_WEB);
00596       strcpy(msg1->mtext,meth.c_str());
00597       strcpy(msg1->mtext+meth.length(),ost.str().c_str());
00598       subs_[i].post(msg1,true);
00599       unsigned int keep_supersleep_original_value = superSleepSec_.value_;
00600       superSleepSec_.value_=10*keep_supersleep_original_value;
00601       MsgBuf msg(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_WEB);
00602       bool done = false;
00603       std::vector<char *>pieces;
00604       while(!done){
00605         unsigned long retval1 = subs_[i].rcvNonBlocking(msg,true);
00606         if(retval1 == MSGQ_MESSAGE_TYPE_RANGE){
00607 	  ::sleep(1);
00608           continue;
00609         }
00610         unsigned int nbytes = atoi(msg->mtext);
00611         if(nbytes < MAX_PIPE_BUFFER_SIZE) done = true; // this will break the while loop
00612         char *buf= new char[nbytes];
00613         ssize_t retval = read(anonymousPipe_[PIPE_READ],buf,nbytes);
00614         if(retval!=nbytes) std::cout 
00615           << "CAREFUL HERE, read less bytes than expected from pipe in subWeb" << std::endl;
00616         pieces.push_back(buf);
00617       }
00618       superSleepSec_.value_=keep_supersleep_original_value;
00619       for(unsigned int j = 0; j < pieces.size(); j++){
00620         *out<<pieces[j];    // chain the buffers into the output strstream
00621         delete[] pieces[j]; //make sure to release all buffers used for reading the pipe
00622       }
00623     }
00624   }
00625 }
00626 
00627 
00628 //______________________________________________________________________________
00629 void FUEventProcessor::defaultWebPage(xgi::Input  *in, xgi::Output *out)
00630   throw (xgi::exception::Exception)
00631 {
00632 
00633 
00634   *out << "<!DOCTYPE HTML PUBLIC \"-//W3C//DTD HTML 4.0 Transitional//EN\">" 
00635        << "<html><head><title>" << getApplicationDescriptor()->getClassName() << (nbSubProcesses_.value_ > 0 ? "MP " : " ") 
00636        << getApplicationDescriptor()->getInstance() << "</title>"
00637        << "<meta http-equiv=\"REFRESH\" content=\"0;url=/evf/html/defaultBasePage.html\">"
00638        << "</head></html>";
00639 }
00640 
00641 
00642 //______________________________________________________________________________
00643 
00644 
00645 void FUEventProcessor::spotlightWebPage(xgi::Input  *in, xgi::Output *out)
00646   throw (xgi::exception::Exception)
00647 {
00648 
00649   std::string urn = getApplicationDescriptor()->getURN();
00650 
00651   *out << "<!-- base href=\"/" <<  urn
00652        << "\"> -->" << std::endl;
00653   *out << "<html>"                                                   << std::endl;
00654   *out << "<head>"                                                   << std::endl;
00655   *out << "<link type=\"text/css\" rel=\"stylesheet\"";
00656   *out << " href=\"/evf/html/styles.css\"/>"                   << std::endl;
00657   *out << "<title>" << getApplicationDescriptor()->getClassName() 
00658        << getApplicationDescriptor()->getInstance() 
00659        << " MAIN</title>"     << std::endl;
00660   *out << "</head>"                                                  << std::endl;
00661   *out << "<body>"                                                   << std::endl;
00662   *out << "<table border=\"0\" width=\"100%\">"                      << std::endl;
00663   *out << "<tr>"                                                     << std::endl;
00664   *out << "  <td align=\"left\">"                                    << std::endl;
00665   *out << "    <img"                                                 << std::endl;
00666   *out << "     align=\"middle\""                                    << std::endl;
00667   *out << "     src=\"/evf/images/spoticon.jpg\""                            << std::endl;
00668   *out << "     alt=\"main\""                                        << std::endl;
00669   *out << "     width=\"64\""                                        << std::endl;
00670   *out << "     height=\"64\""                                       << std::endl;
00671   *out << "     border=\"\"/>"                                       << std::endl;
00672   *out << "    <b>"                                                  << std::endl;
00673   *out << getApplicationDescriptor()->getClassName() 
00674        << getApplicationDescriptor()->getInstance()                  << std::endl;
00675   *out << "      " << fsm_.stateName()->toString()                   << std::endl;
00676   *out << "    </b>"                                                 << std::endl;
00677   *out << "  </td>"                                                  << std::endl;
00678   *out << "  <td width=\"32\">"                                      << std::endl;
00679   *out << "    <a href=\"/urn:xdaq-application:lid=3\">"             << std::endl;
00680   *out << "      <img"                                               << std::endl;
00681   *out << "       align=\"middle\""                                  << std::endl;
00682   *out << "       src=\"/hyperdaq/images/HyperDAQ.jpg\""             << std::endl;
00683   *out << "       alt=\"HyperDAQ\""                                  << std::endl;
00684   *out << "       width=\"32\""                                      << std::endl;
00685   *out << "       height=\"32\""                                     << std::endl;
00686   *out << "       border=\"\"/>"                                     << std::endl;
00687   *out << "    </a>"                                                 << std::endl;
00688   *out << "  </td>"                                                  << std::endl;
00689   *out << "  <td width=\"32\">"                                      << std::endl;
00690   *out << "  </td>"                                                  << std::endl;
00691   *out << "  <td width=\"32\">"                                      << std::endl;
00692   *out << "    <a href=\"/" << urn << "/\">"                         << std::endl;
00693   *out << "      <img"                                               << std::endl;
00694   *out << "       align=\"middle\""                                  << std::endl;
00695   *out << "       src=\"/evf/images/epicon.jpg\""                    << std::endl;
00696   *out << "       alt=\"main\""                                      << std::endl;
00697   *out << "       width=\"32\""                                      << std::endl;
00698   *out << "       height=\"32\""                                     << std::endl;
00699   *out << "       border=\"\"/>"                                     << std::endl;
00700   *out << "    </a>"                                                 << std::endl;
00701   *out << "  </td>"                                                  << std::endl;
00702   *out << "</tr>"                                                    << std::endl;
00703   *out << "</table>"                                                 << std::endl;
00704 
00705   *out << "<hr/>"                                                    << std::endl;
00706   
00707   std::ostringstream ost;
00708   if(myProcess_) 
00709     ost << "/SubWeb?process=" << getpid() << "&method=moduleWeb&";
00710   else
00711     ost << "/moduleWeb?";
00712   urn += ost.str();
00713   if(evtProcessor_ && (myProcess_ || nbSubProcesses_.value_==0))
00714     evtProcessor_.taskWebPage(in,out,urn);
00715   else if(evtProcessor_)
00716     evtProcessor_.summaryWebPage(in,out,urn);
00717   else
00718     *out << "<td>HLT Unconfigured</td>" << std::endl;
00719   *out << "</table>"                                                 << std::endl;
00720   
00721   *out << "<br><textarea rows=" << 10 << " cols=80 scroll=yes>"      << std::endl;
00722   *out << configuration_                                             << std::endl;
00723   *out << "</textarea><P>"                                           << std::endl;
00724   
00725   *out << "</body>"                                                  << std::endl;
00726   *out << "</html>"                                                  << std::endl;
00727 
00728 
00729 }
00730 void FUEventProcessor::scalersWeb(xgi::Input  *in, xgi::Output *out)
00731   throw (xgi::exception::Exception)
00732 {
00733 
00734   out->getHTTPResponseHeader().addHeader( "Content-Type",
00735                                           "application/octet-stream" );
00736   out->getHTTPResponseHeader().addHeader( "Content-Transfer-Encoding",
00737                                           "binary" );
00738   if(evtProcessor_ != 0){
00739     out->write( (char*)(evtProcessor_.getPackedTriggerReportAsStruct()), sizeof(TriggerReportStatic) );
00740   }
00741 }
00742 
00743 void FUEventProcessor::pathNames(xgi::Input  *in, xgi::Output *out)
00744   throw (xgi::exception::Exception)
00745 {
00746 
00747   if(evtProcessor_ != 0){
00748     xdata::Serializable *legenda = scalersLegendaInfoSpace_->find("scalersLegenda");
00749     if(legenda !=0){
00750       std::string slegenda = ((xdata::String*)legenda)->value_;
00751       *out << slegenda << std::endl;
00752     }
00753   }
00754 }
00755 
00756 void FUEventProcessor::attachDqmToShm() throw (evf::Exception)  
00757 {
00758   std::string errmsg;
00759   bool success = false;
00760   try {
00761     edm::ServiceRegistry::Operate operate(evtProcessor_->getToken());
00762     if(edm::Service<FUShmDQMOutputService>().isAvailable())
00763       success = edm::Service<FUShmDQMOutputService>()->attachToShm();
00764     if (!success) errmsg = "Failed to attach DQM service to shared memory";
00765   }
00766   catch (cms::Exception& e) {
00767     errmsg = "Failed to attach DQM service to shared memory: " + (std::string)e.what();
00768   }
00769   if (!errmsg.empty()) XCEPT_RAISE(evf::Exception,errmsg);
00770 }
00771 
00772 
00773 
00774 void FUEventProcessor::detachDqmFromShm() throw (evf::Exception)
00775 {
00776   std::string errmsg;
00777   bool success = false;
00778   try {
00779     edm::ServiceRegistry::Operate operate(evtProcessor_->getToken());
00780     if(edm::Service<FUShmDQMOutputService>().isAvailable())
00781       success = edm::Service<FUShmDQMOutputService>()->detachFromShm();
00782     if (!success) errmsg = "Failed to detach DQM service from shared memory";
00783   }
00784   catch (cms::Exception& e) {
00785     errmsg = "Failed to detach DQM service from shared memory: " + (std::string)e.what();
00786   }
00787   if (!errmsg.empty()) XCEPT_RAISE(evf::Exception,errmsg);
00788 }
00789 
00790 
00791 std::string FUEventProcessor::logsAsString()
00792 {
00793   std::ostringstream oss;
00794   if(logWrap_)
00795     {
00796       for(unsigned int i = logRingIndex_; i < logRing_.size(); i++)
00797         oss << logRing_[i] << std::endl;
00798       for(unsigned int i = 0; i <  logRingIndex_; i++)
00799         oss << logRing_[i] << std::endl;
00800     }
00801   else
00802       for(unsigned int i = logRingIndex_; i < logRing_.size(); i++)
00803         oss << logRing_[i] << std::endl;
00804     
00805   return oss.str();
00806 }
00807   
00808 void FUEventProcessor::localLog(std::string m)
00809 {
00810   timeval tv;
00811 
00812   gettimeofday(&tv,0);
00813   tm *uptm = localtime(&tv.tv_sec);
00814   char datestring[256];
00815   strftime(datestring, sizeof(datestring),"%c", uptm);
00816 
00817   if(logRingIndex_ == 0){logWrap_ = true; logRingIndex_ = logRingSize_;}
00818   logRingIndex_--;
00819   std::ostringstream timestamp;
00820   timestamp << " at " << datestring;
00821   m += timestamp.str();
00822   logRing_[logRingIndex_] = m;
00823 }
00824 
00825 void FUEventProcessor::startSupervisorLoop()
00826 {
00827   try {
00828     wlSupervising_=
00829       toolbox::task::getWorkLoopFactory()->getWorkLoop("Supervisor",
00830                                                        "waiting");
00831     if (!wlSupervising_->isActive()) wlSupervising_->activate();
00832     asSupervisor_ = toolbox::task::bind(this,&FUEventProcessor::supervisor,
00833                                         "Supervisor");
00834     wlSupervising_->submit(asSupervisor_);
00835     supervising_ = true;
00836   }
00837   catch (xcept::Exception& e) {
00838     std::string msg = "Failed to start workloop 'Supervisor'.";
00839     XCEPT_RETHROW(evf::Exception,msg,e);
00840   }
00841 }
00842 
00843 void FUEventProcessor::startReceivingLoop()
00844 {
00845   try {
00846     wlReceiving_=
00847       toolbox::task::getWorkLoopFactory()->getWorkLoop("Receiving",
00848                                                        "waiting");
00849     if (!wlReceiving_->isActive()) wlReceiving_->activate();
00850     asReceiveMsgAndExecute_ = toolbox::task::bind(this,&FUEventProcessor::receiving,
00851                                         "Receiving");
00852     wlReceiving_->submit(asReceiveMsgAndExecute_);
00853     receiving_ = true;
00854   }
00855   catch (xcept::Exception& e) {
00856     std::string msg = "Failed to start workloop 'Receiving'.";
00857     XCEPT_RETHROW(evf::Exception,msg,e);
00858   }
00859 }
00860 void FUEventProcessor::startReceivingMonitorLoop()
00861 {
00862   try {
00863     wlReceivingMonitor_=
00864       toolbox::task::getWorkLoopFactory()->getWorkLoop("ReceivingM",
00865                                                        "waiting");
00866     if (!wlReceivingMonitor_->isActive()) 
00867       wlReceivingMonitor_->activate();
00868     asReceiveMsgAndRead_ = 
00869       toolbox::task::bind(this,&FUEventProcessor::receivingAndMonitor,
00870                           "ReceivingM");
00871     wlReceivingMonitor_->submit(asReceiveMsgAndRead_);
00872     receivingM_ = true;
00873   }
00874   catch (xcept::Exception& e) {
00875     std::string msg = "Failed to start workloop 'ReceivingM'.";
00876     XCEPT_RETHROW(evf::Exception,msg,e);
00877   }
00878 }
00879 
00880 bool FUEventProcessor::receiving(toolbox::task::WorkLoop *)
00881 {
00882   MsgBuf msg;
00883   try{
00884     myProcess_->rcvSlave(msg,false); //will receive only messages from Master
00885     if(msg->mtype==MSQM_MESSAGE_TYPE_STOP)
00886       {
00887         pthread_mutex_lock(&stop_lock_);
00888         fsm_.fireEvent("Stop",this); // need to set state in fsm first to allow stopDone transition
00889         try{
00890           LOG4CPLUS_DEBUG(getApplicationLogger(),
00891                           "Trying to create message service presence ");
00892           edm::PresenceFactory *pf = edm::PresenceFactory::get();
00893           if(pf != 0) {
00894             pf->makePresence("MessageServicePresence").release();
00895           }
00896           else {
00897             LOG4CPLUS_ERROR(getApplicationLogger(),
00898                             "Unable to create message service presence ");
00899           }
00900         } 
00901         catch(...) {
00902           LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception");
00903         }
00904         stopClassic(); // call the normal sequence of stopping - as this is allowed to fail provisions must be made ...@@@EM
00905         MsgBuf msg1(0,MSQS_MESSAGE_TYPE_STOP);
00906         myProcess_->postSlave(msg1,false);
00907         pthread_mutex_unlock(&stop_lock_);
00908         fclose(stdout);
00909         fclose(stderr);
00910         exit(EXIT_SUCCESS);
00911       }
00912   }
00913   catch(evf::Exception &e){}
00914   return true;
00915 }
00916 
00917 bool FUEventProcessor::supervisor(toolbox::task::WorkLoop *)
00918 {
00919   pthread_mutex_lock(&stop_lock_);
00920   if(subs_.size()!=nbSubProcesses_.value_)
00921     {
00922       subs_.resize(nbSubProcesses_.value_);
00923       spMStates_.resize(nbSubProcesses_.value_);
00924       spmStates_.resize(nbSubProcesses_.value_);
00925       for(unsigned int i = 0; i < spMStates_.size(); i++)
00926         {
00927           spMStates_[i] = edm::event_processor::sInit; 
00928           spmStates_[i] = 0; 
00929         }
00930     }
00931   bool running = fsm_.stateName()->toString()=="Enabled";
00932   bool stopping = fsm_.stateName()->toString()=="stopping";
00933   for(unsigned int i = 0; i < subs_.size(); i++)
00934     {
00935       if(subs_[i].alive()==-1000) continue;
00936       int sl;
00937 
00938       pid_t killedOrNot = waitpid(subs_[i].pid(),&sl,WNOHANG);
00939 
00940       if(killedOrNot==subs_[i].pid()) subs_[i].setStatus((WIFEXITED(sl) != 0 ? 0 : -1));
00941       else continue;
00942       pthread_mutex_lock(&pickup_lock_);
00943       std::ostringstream ost;
00944       if(subs_[i].alive()==0) ost << " process exited with status " << WEXITSTATUS(sl);
00945       else if(WIFSIGNALED(sl)!=0) ost << " process terminated with signal " << WTERMSIG(sl);
00946       else ost << " process stopped ";
00947       subs_[i].countdown()=slaveRestartDelaySecs_.value_;
00948       subs_[i].setReasonForFailed(ost.str());
00949       spMStates_[i] = evtProcessor_.notstarted_state_code();
00950       spmStates_[i] = 0;
00951       std::ostringstream ost1;
00952       ost1 << "-E- Slave " << subs_[i].pid() << ost.str();
00953       localLog(ost1.str());
00954       if(!autoRestartSlaves_.value_) subs_[i].disconnect();
00955       pthread_mutex_unlock(&pickup_lock_);
00956     }
00957   pthread_mutex_unlock(&stop_lock_);    
00958   if(stopping) return true; // if in stopping we are done
00959 
00960   if(running)
00961     {
00962       // if enabled, this loop will periodically check if dead slaves countdown has expired and restart them
00963       // this is only active while running, hence, the stop lock is acquired and only released at end of loop
00964       if(autoRestartSlaves_.value_){
00965         pthread_mutex_lock(&stop_lock_); //lockout slave killing at stop while you check for restarts
00966         for(unsigned int i = 0; i < subs_.size(); i++)
00967           {
00968             if(subs_[i].alive() != 1){
00969               if(subs_[i].countdown() == 0)
00970                 {
00971                   if(subs_[i].restartCount()>2){
00972                     LOG4CPLUS_WARN(getApplicationLogger()," Not restarting subprocess in slot " << i 
00973                                    << " - maximum restart count reached");
00974                     std::ostringstream ost1;
00975                     ost1 << "-W- Dead Process in slot " << i << " reached maximum restart count"; 
00976                     localLog(ost1.str());
00977                     subs_[i].countdown()--;
00978                     XCEPT_DECLARE(evf::Exception,
00979                                   sentinelException, ost1.str());
00980                     notifyQualified("error",sentinelException);
00981                     continue;
00982                   }
00983                   subs_[i].restartCount()++;
00984                   pid_t rr = subs_[i].forkNew();
00985                   if(rr==0)
00986                     {
00987                       myProcess_=&subs_[i];
00988                       scalersUpdates_ = 0;
00989                       int retval = pthread_mutex_destroy(&stop_lock_);
00990                       if(retval != 0) perror("error");
00991                       retval = pthread_mutex_init(&stop_lock_,0);
00992                       if(retval != 0) perror("error");
00993                       fsm_.disableRcmsStateNotification();
00994                       fsm_.fireEvent("Stop",this); // need to set state in fsm first to allow stopDone transition
00995                       fsm_.fireEvent("StopDone",this); // need to set state in fsm first to allow stopDone transition
00996                       fsm_.fireEvent("Enable",this); // need to set state in fsm first to allow stopDone transition
00997                       try{
00998                         xdata::Serializable *lsid = applicationInfoSpace_->find("lumiSectionIndex");
00999                         if(lsid) {
01000                           ((xdata::UnsignedInteger32*)(lsid))->value_--; // need to reset to value before end of ls in which process died
01001                         }
01002                       }
01003                       catch(...){
01004                         std::cout << "trouble with lsindex during restart" << std::endl;
01005                       }
01006                       try{
01007                         xdata::Serializable *lstb = applicationInfoSpace_->find("lsToBeRecovered");
01008                         if(lstb) {
01009                           ((xdata::Boolean*)(lstb))->value_ = false; // do not issue eol/bol for all Ls when restarting
01010                         }
01011                       }
01012                       catch(...){
01013                         std::cout << "trouble with resetting flag for eol recovery " << std::endl;
01014                       }
01015 
01016                       evtProcessor_.adjustLsIndexForRestart();
01017                       evtProcessor_.resetPackedTriggerReport();
01018                       enableMPEPSlave();
01019                       return false; // exit the supervisor loop immediately in the child !!!
01020                     }
01021                   else
01022                     {
01023                       std::ostringstream ost1;
01024                       ost1 << "-I- New Process " << rr << " forked for slot " << i; 
01025                       localLog(ost1.str());
01026                     }
01027                 }
01028               if(subs_[i].countdown()>=0) subs_[i].countdown()--;
01029             }
01030           }
01031         pthread_mutex_unlock(&stop_lock_);
01032       } // finished handling replacement of dead slaves once they've been reaped
01033     }
01034   xdata::Serializable *lsid = 0; 
01035   xdata::Serializable *psid = 0;
01036   xdata::Serializable *epMAltState = 0; 
01037   xdata::Serializable *epmAltState = 0;
01038   xdata::Serializable *dqmp = 0;
01039   xdata::UnsignedInteger32 *dqm = 0;
01040 
01041 
01042   
01043   MsgBuf msg1(0,MSQM_MESSAGE_TYPE_PRG);
01044   MsgBuf msg2(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_PRR);
01045   if(running){  
01046     try{
01047       lsid = applicationInfoSpace_->find("lumiSectionIndex");
01048       psid = applicationInfoSpace_->find("prescaleSetIndex");
01049       nbProcessed = monitorInfoSpace_->find("nbProcessed");
01050       nbAccepted  = monitorInfoSpace_->find("nbAccepted");
01051       epMAltState = monitorInfoSpace_->find("epSPMacroStateInt");
01052       epmAltState = monitorInfoSpace_->find("epSPMicroStateInt");
01053       dqmp = applicationInfoSpace_-> find("nbDqmUpdates");      
01054     }
01055     catch(xdata::exception::Exception e){
01056       LOG4CPLUS_INFO(getApplicationLogger(),"could not retrieve some data - " << e.what());    
01057     }
01058 
01059     try{
01060       if(nbProcessed !=0 && nbAccepted !=0)
01061         {
01062           xdata::UnsignedInteger32*nbp = ((xdata::UnsignedInteger32*)nbProcessed);
01063           xdata::UnsignedInteger32*nba = ((xdata::UnsignedInteger32*)nbAccepted);
01064           xdata::UnsignedInteger32*ls  = ((xdata::UnsignedInteger32*)lsid);
01065           xdata::UnsignedInteger32*ps  = ((xdata::UnsignedInteger32*)psid);
01066           if(dqmp!=0)
01067             dqm = (xdata::UnsignedInteger32*)dqmp;
01068           if(dqm) dqm->value_ = 0;
01069           nbTotalDQM_ = 0;
01070           nbp->value_ = 0;
01071           nba->value_ = 0;
01072           nblive_ = 0;
01073           nbdead_ = 0;
01074           scalersUpdates_ = 0;
01075 
01076           for(unsigned int i = 0; i < subs_.size(); i++)
01077             {
01078               if(subs_[i].alive()>0)
01079                 {
01080                   nblive_++;
01081                   try{
01082                     subs_[i].post(msg1,true);
01083                     
01084                     unsigned long retval = subs_[i].rcvNonBlocking(msg2,true);
01085                     if(retval == (unsigned long) msg2->mtype){
01086                       prg* p = (struct prg*)(msg2->mtext);
01087                       subs_[i].setParams(p);
01088                       spMStates_[i] = p->Ms;
01089                       spmStates_[i] = p->ms;
01090                       cpustat_->addEntry(p->ms);
01091                       if(!subs_[i].inInconsistentState() && 
01092                          (p->Ms == edm::event_processor::sError 
01093                           || p->Ms == edm::event_processor::sInvalid
01094                           || p->Ms == edm::event_processor::sStopping))
01095                         {
01096                           std::ostringstream ost;
01097                           ost << "edm::eventprocessor slot " << i << " process id " 
01098                               << subs_[i].pid() << " not in Running state : Mstate=" 
01099                               << evtProcessor_.stateNameFromIndex(p->Ms) << " mstate="
01100                               << evtProcessor_.moduleNameFromIndex(p->ms) 
01101                               << " - Look into possible error messages from HLT process";
01102                           LOG4CPLUS_WARN(getApplicationLogger(),ost.str());
01103                         }
01104                       nbp->value_ += subs_[i].params().nbp;
01105                       nba->value_  += subs_[i].params().nba;
01106                       if(dqm)dqm->value_ += p->dqm;
01107                       nbTotalDQM_ +=  p->dqm;
01108                       scalersUpdates_ += p->trp;
01109                       if(p->ls > ls->value_) ls->value_ = p->ls;
01110                       if(p->ps != ps->value_) ps->value_ = p->ps;
01111                     }
01112                     else{
01113                       nbp->value_ += subs_[i].get_save_nbp();
01114                       nba->value_ += subs_[i].get_save_nba();
01115                     }
01116                   } 
01117                   catch(evf::Exception &e){
01118                     LOG4CPLUS_INFO(getApplicationLogger(),
01119                                    "could not send/receive msg on slot " 
01120                                    << i << " - " << e.what());    
01121                   }
01122                     
01123                 }
01124               else
01125                 {
01126                   nbp->value_ += subs_[i].get_save_nbp();
01127                   nba->value_ += subs_[i].get_save_nba();
01128                   nbdead_++;
01129                 }
01130             }
01131         }
01132       
01133     }
01134     catch(std::exception &e){
01135       LOG4CPLUS_INFO(getApplicationLogger(),"std exception - " << e.what());    
01136     }
01137     catch(...){
01138       LOG4CPLUS_INFO(getApplicationLogger(),"unknown exception ");    
01139     }
01140   }
01141   else{
01142     for(unsigned int i = 0; i < subs_.size(); i++)
01143       {
01144         if(subs_[i].alive()==-1000)
01145           {
01146             spMStates_[i] = edm::event_processor::sInit;
01147             spmStates_[i] = 0;
01148           }
01149       }
01150   }
01151   try{
01152     monitorInfoSpace_->lock();
01153     monitorInfoSpace_->fireItemGroupChanged(names_,0);
01154     monitorInfoSpace_->unlock();
01155   }
01156   catch(xdata::exception::Exception &e)
01157     {
01158       LOG4CPLUS_ERROR(log_, "Exception from fireItemGroupChanged: " << e.what());
01159       //        localLog(e.what());
01160     }
01161   ::sleep(superSleepSec_.value_);       
01162   return true;
01163 }
01164 
01165 void FUEventProcessor::startScalersWorkLoop() throw (evf::Exception)
01166 {
01167   try {
01168     wlScalers_=
01169       toolbox::task::getWorkLoopFactory()->getWorkLoop("Scalers",
01170                                                        "waiting");
01171     if (!wlScalers_->isActive()) wlScalers_->activate();
01172     asScalers_ = toolbox::task::bind(this,&FUEventProcessor::scalers,
01173                                      "Scalers");
01174     
01175   wlScalers_->submit(asScalers_);
01176   wlScalersActive_ = true;
01177   }
01178   catch (xcept::Exception& e) {
01179     std::string msg = "Failed to start workloop 'Scalers'.";
01180     XCEPT_RETHROW(evf::Exception,msg,e);
01181   }
01182 }
01183 
01184 //______________________________________________________________________________
01185 
01186 void FUEventProcessor::startSummarizeWorkLoop() throw (evf::Exception)
01187 {
01188   try {
01189     wlSummarize_=
01190       toolbox::task::getWorkLoopFactory()->getWorkLoop("Summary",
01191                                                        "waiting");
01192     if (!wlSummarize_->isActive()) wlSummarize_->activate();
01193     
01194     asSummarize_ = toolbox::task::bind(this,&FUEventProcessor::summarize,
01195                                        "Summary");
01196 
01197     wlSummarize_->submit(asSummarize_);
01198     wlSummarizeActive_ = true;
01199   }
01200   catch (xcept::Exception& e) {
01201     std::string msg = "Failed to start workloop 'Summarize'.";
01202     XCEPT_RETHROW(evf::Exception,msg,e);
01203   }
01204 }
01205 
01206 //______________________________________________________________________________
01207 
01208 bool FUEventProcessor::scalers(toolbox::task::WorkLoop* wl)
01209 {
01210   if(evtProcessor_)
01211     {
01212       if(!evtProcessor_.getTriggerReport(true)) {
01213         wlScalersActive_ = false;
01214         return false;
01215       }
01216     }
01217   else
01218     {
01219       std::cout << getpid()<< " Scalers workloop, bailing out, no evtProcessor " << std::endl;
01220       wlScalersActive_ = false;
01221       return false;
01222     }
01223   if(myProcess_) 
01224     {
01225       //      std::cout << getpid() << "going to post on control queue from scalers" << std::endl;
01226       int ret = myProcess_->postSlave(evtProcessor_.getPackedTriggerReport(),false);
01227       if(ret!=0)      std::cout << "scalers workloop, error posting to sqs_ " << errno << std::endl;
01228       scalersUpdates_++;
01229     }
01230   else
01231     evtProcessor_.fireScalersUpdate();
01232   return true;
01233 }
01234 
01235 //______________________________________________________________________________
01236 bool FUEventProcessor::summarize(toolbox::task::WorkLoop* wl)
01237 {
01238   MsgBuf msg(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_TRR);
01239   evtProcessor_.resetPackedTriggerReport();
01240   bool atLeastOneProcessUpdatedSuccessfully = false;
01241   int msgCount = 0;
01242   for (unsigned int i = 0; i < subs_.size(); i++)
01243     {
01244       if(subs_[i].alive()>0)
01245         {
01246 
01247           int ret = 0;
01248           try{
01249             ret = subs_[i].rcv(msg,false);
01250             msgCount++;
01251           }
01252           catch(evf::Exception &e)
01253             {
01254               std::cout << "exception in msgrcv on " << i 
01255                         << " " << subs_[i].alive() << " " << strerror(errno) << std::endl;
01256               continue;
01257               //do nothing special
01258             }
01259           
01260 
01261           if(ret==MSQS_MESSAGE_TYPE_TRR) {
01262             TriggerReportStatic *trp = (TriggerReportStatic *)msg->mtext;
01263             if(trp->lumiSection > evtProcessor_.getLumiSectionReferenceIndex()){
01264               std::cout << "postpone handling of msg from slot " << i << " with Ls " <<  trp->lumiSection
01265                         << " should be " << evtProcessor_.getLumiSectionReferenceIndex() << std::endl;
01266               subs_[i].post(msg,false);
01267             }else{
01268               atLeastOneProcessUpdatedSuccessfully = true;
01269               evtProcessor_.sumAndPackTriggerReport(msg);
01270             }
01271           }
01272           else std::cout << "msgrcv returned error " << errno << std::endl;
01273         }
01274     }
01275   if(atLeastOneProcessUpdatedSuccessfully){
01276     nbSubProcessesReporting_.value_ = msgCount;
01277     evtProcessor_.updateRollingReport();
01278     evtProcessor_.fireScalersUpdate();
01279   }
01280   else{
01281     LOG4CPLUS_WARN(getApplicationLogger(),"Summarize loop: no process updated successfully - sleep 10 seconds before trying again");          
01282     if(msgCount==0) evtProcessor_.withdrawLumiSectionIncrement();
01283     nbSubProcessesReporting_.value_ = 0;
01284     ::sleep(10);
01285   }
01286   if(fsm_.stateName()->toString()!="Enabled"){
01287     wlScalersActive_ = false;
01288     return false;
01289   }
01290   //  cpustat_->printStat();
01291   if(iDieStatisticsGathering_.value_){
01292     try{
01293       cpustat_ ->sendStat(evtProcessor_.getLumiSectionReferenceIndex());
01294       ratestat_->sendStat((unsigned char*)(evtProcessor_.getPackedTriggerReportAsStruct()),
01295                           sizeof(TriggerReportStatic),
01296                           evtProcessor_.getLumiSectionReferenceIndex());
01297     }catch(evf::Exception &e){
01298       LOG4CPLUS_INFO(getApplicationLogger(),"coud not send statistics"
01299                      << e.what());
01300     }
01301   }
01302   cpustat_->reset();
01303   return true;
01304 }
01305 
01306 
01307 
01308 bool FUEventProcessor::receivingAndMonitor(toolbox::task::WorkLoop *)
01309 {
01310   MsgBuf msg;
01311   try{
01312     myProcess_->rcvSlave(msg,true); //will receive only messages from Master
01313     switch(msg->mtype)
01314       {
01315       case MSQM_MESSAGE_TYPE_MCS:
01316         {
01317           xgi::Input *in = 0;
01318           xgi::Output out;
01319           evtProcessor_.microState(in,&out);
01320           MsgBuf msg1(out.str().size(),MSQS_MESSAGE_TYPE_MCR);
01321           strncpy(msg1->mtext,out.str().c_str(),out.str().size());
01322           myProcess_->postSlave(msg1,true);
01323           break;
01324         }
01325       
01326       case MSQM_MESSAGE_TYPE_PRG:
01327         {
01328           xdata::Serializable *dqmp = 0;
01329           xdata::UnsignedInteger32 *dqm = 0;
01330           evtProcessor_.monitoring(0);
01331           try{
01332             dqmp = applicationInfoSpace_-> find("nbDqmUpdates");
01333           }  catch(xdata::exception::Exception e){}
01334           if(dqmp!=0)
01335             dqm = (xdata::UnsignedInteger32*)dqmp;
01336           MsgBuf msg1(sizeof(prg),MSQS_MESSAGE_TYPE_PRR);
01337           //      monitorInfoSpace_->lock();  
01338           prg * data           = (prg*)msg1->mtext;
01339           data->ls             = evtProcessor_.lsid_;
01340           data->ps             = evtProcessor_.psid_;
01341           data->nbp            = evtProcessor_->totalEvents();
01342           data->nba            = evtProcessor_->totalEventsPassed();
01343           data->Ms             = evtProcessor_.epMAltState_.value_;
01344           data->ms             = evtProcessor_.epmAltState_.value_;
01345           if(dqm) data->dqm    = dqm->value_; else data->dqm = 0;
01346           data->trp            = scalersUpdates_;
01347           //      monitorInfoSpace_->unlock();  
01348           myProcess_->postSlave(msg1,true);
01349           if(exitOnError_.value_)
01350           { 
01351             // after each monitoring cycle check if we are in inconsistent state and exit if configured to do so  
01352             //      std::cout << getpid() << "receivingAndMonitor: trying to acquire stop lock " << std::endl;
01353             if(data->Ms == edm::event_processor::sStopping || data->Ms == edm::event_processor::sError) 
01354               { 
01355                 bool running = true;
01356                 int count = 0;
01357                 while(running){
01358                   int retval = pthread_mutex_lock(&stop_lock_);
01359                   if(retval != 0) perror("error");
01360                   running = fsm_.stateName()->toString()=="Enabled";
01361                   if(count>5) exit(-1);
01362                   pthread_mutex_unlock(&stop_lock_);
01363                   if(running) {::sleep(1); count++;}
01364                 }
01365               }
01366             
01367           }
01368           //      scalersUpdates_++;
01369           break;
01370         }
01371       case MSQM_MESSAGE_TYPE_WEB:
01372         {
01373           xgi::Input  *in = 0;
01374           xgi::Output out;
01375           unsigned int bytesToSend = 0;
01376           MsgBuf msg1(NUMERIC_MESSAGE_SIZE,MSQS_MESSAGE_TYPE_WEB);
01377           std::string query = msg->mtext;
01378           size_t pos = query.find_first_of("&");
01379           std::string method;
01380           std::string args;
01381           if(pos!=std::string::npos)  
01382             {
01383               method = query.substr(0,pos);
01384               args = query.substr(pos+1,query.length()-pos-1);
01385             }
01386           else
01387             method=query;
01388 
01389           if(method=="Spotlight")
01390             {
01391               spotlightWebPage(in,&out);
01392             }
01393           else if(method=="procStat")
01394             {
01395               procStat(in,&out);
01396             }
01397           else if(method=="moduleWeb")
01398             {
01399               internal::MyCgi mycgi;
01400               boost::char_separator<char> sep(";");
01401               boost::tokenizer<boost::char_separator<char> > tokens(args, sep);
01402               for (boost::tokenizer<boost::char_separator<char> >::iterator tok_iter = tokens.begin();
01403                    tok_iter != tokens.end(); ++tok_iter){
01404                 size_t pos = (*tok_iter).find_first_of("%");
01405                 if(pos != std::string::npos){
01406                   std::string first  = (*tok_iter).substr(0    ,                        pos);
01407                   std::string second = (*tok_iter).substr(pos+1, (*tok_iter).length()-pos-1);
01408                   mycgi.getEnvironment()[first]=second;
01409                 }
01410               }
01411               moduleWeb(&mycgi,&out);
01412             }
01413           else if(method=="Default")
01414             {
01415               defaultWebPage(in,&out);
01416             }
01417           else 
01418             {
01419               out << "Error 404!!!!!!!!" << std::endl;
01420             }
01421 
01422 
01423           bytesToSend = out.str().size();
01424           unsigned int cycle = 0;
01425           if(bytesToSend==0)
01426             {
01427               snprintf(msg1->mtext, NUMERIC_MESSAGE_SIZE, "%d", bytesToSend);
01428               myProcess_->postSlave(msg1,true);
01429             }
01430           while(bytesToSend !=0){
01431             unsigned int msgSize = bytesToSend>MAX_PIPE_BUFFER_SIZE ? MAX_PIPE_BUFFER_SIZE : bytesToSend;
01432             snprintf(msg1->mtext, NUMERIC_MESSAGE_SIZE, "%d", msgSize);
01433             write(anonymousPipe_[PIPE_WRITE],
01434                   out.str().c_str()+MAX_PIPE_BUFFER_SIZE*cycle,
01435                   msgSize);
01436             myProcess_->postSlave(msg1,true);
01437             bytesToSend -= msgSize;
01438             cycle++;
01439           }
01440           break;
01441         }
01442       case MSQM_MESSAGE_TYPE_TRP:
01443         {
01444           break;
01445         }
01446       }
01447   }
01448   catch(evf::Exception &e){std::cout << "exception caught in recevingM: " << e.what() << std::endl;}
01449   return true;
01450 }
01451 
01452 bool FUEventProcessor::enableCommon()
01453 {
01454   try {    
01455     if(hasShMem_) attachDqmToShm();
01456     int sc = 0;
01457     evtProcessor_->clearCounters();
01458     if(isRunNumberSetter_)
01459       evtProcessor_->setRunNumber(runNumber_.value_);
01460     else
01461       evtProcessor_->declareRunNumber(runNumber_.value_);
01462     try{
01463       ::sleep(1);
01464       evtProcessor_->runAsync();
01465       sc = evtProcessor_->statusAsync();
01466     }
01467     catch(cms::Exception &e) {
01468       reasonForFailedState_ = e.explainSelf();
01469       fsm_.fireFailed(reasonForFailedState_,this);
01470       localLog(reasonForFailedState_);
01471       return false;
01472     }    
01473     catch(std::exception &e) {
01474       reasonForFailedState_  = e.what();
01475       fsm_.fireFailed(reasonForFailedState_,this);
01476       localLog(reasonForFailedState_);
01477       return false;
01478     }
01479     catch(...) {
01480       reasonForFailedState_ = "Unknown Exception";
01481       fsm_.fireFailed(reasonForFailedState_,this);
01482       localLog(reasonForFailedState_);
01483       return false;
01484     }
01485     if(sc != 0) {
01486       std::ostringstream oss;
01487       oss<<"EventProcessor::runAsync returned status code " << sc;
01488       reasonForFailedState_ = oss.str();
01489       fsm_.fireFailed(reasonForFailedState_,this);
01490       localLog(reasonForFailedState_);
01491       return false;
01492     }
01493   }
01494   catch (xcept::Exception &e) {
01495     reasonForFailedState_ = "enabling FAILED: " + (std::string)e.what();
01496     fsm_.fireFailed(reasonForFailedState_,this);
01497     localLog(reasonForFailedState_);
01498     return false;
01499   }
01500   try{
01501     fsm_.fireEvent("EnableDone",this);
01502   }
01503   catch (xcept::Exception &e) {
01504     std::cout << "exception " << (std::string)e.what() << std::endl;
01505     throw;
01506   }
01507 
01508   return false;
01509 }
01510   
01511 bool FUEventProcessor::enableClassic()
01512 {
01513   bool retval = enableCommon();
01514   while(evtProcessor_->getState()!= edm::event_processor::sRunning){
01515     LOG4CPLUS_INFO(getApplicationLogger(),"waiting for edm::EventProcessor to start before enabling watchdog");
01516     ::sleep(1);
01517   }
01518   
01519   //  implementation moved to EPWrapper
01520   //  startScalersWorkLoop(); // this is now not done any longer 
01521   localLog("-I- Start completed");
01522   return retval;
01523 }
01524 bool FUEventProcessor::enableMPEPSlave()
01525 {
01526   //all this happens only in the child process
01527   startReceivingLoop();
01528   startReceivingMonitorLoop();
01529   evtProcessor_.resetWaiting();
01530   startScalersWorkLoop();
01531   while(!evtProcessor_.isWaitingForLs())
01532     ::sleep(1);
01533   // @EM test do not run monitor loop in slave, only receiving&Monitor
01534   //  evtProcessor_.startMonitoringWorkLoop();
01535   try{
01536     //    evtProcessor_.makeServicesOnly();
01537     try{
01538       LOG4CPLUS_DEBUG(getApplicationLogger(),
01539                       "Trying to create message service presence ");
01540       edm::PresenceFactory *pf = edm::PresenceFactory::get();
01541       if(pf != 0) {
01542         pf->makePresence("MessageServicePresence").release();
01543       }
01544       else {
01545         LOG4CPLUS_ERROR(getApplicationLogger(),
01546                         "Unable to create message service presence ");
01547       }
01548     } 
01549     catch(...) {
01550       LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception");
01551     }
01552   ML::MLlog4cplus::setAppl(this);      
01553   }       
01554   catch (xcept::Exception &e) {
01555     reasonForFailedState_ = "enabling FAILED: " + (std::string)e.what();
01556     fsm_.fireFailed(reasonForFailedState_,this);
01557     localLog(reasonForFailedState_);
01558   }
01559   bool retval =  enableCommon();
01560   //  while(evtProcessor_->getState()!= edm::event_processor::sRunning){
01561   //    LOG4CPLUS_INFO(getApplicationLogger(),"waiting for edm::EventProcessor to start before enabling watchdog");
01562   //    ::sleep(1);
01563   //  }
01564   return retval;
01565 }
01566 
01567 bool FUEventProcessor::stopClassic()
01568 {
01569   try {
01570     LOG4CPLUS_INFO(getApplicationLogger(),"Start stopping :) ...");
01571     edm::EventProcessor::StatusCode rc = evtProcessor_.stop();
01572     if(rc == edm::EventProcessor::epSuccess) 
01573       fsm_.fireEvent("StopDone",this);
01574     else
01575       {
01576         //      epMState_ = evtProcessor_->currentStateName();
01577         if(rc == edm::EventProcessor::epTimedOut)
01578           reasonForFailedState_ = "EventProcessor stop timed out";
01579         else
01580           reasonForFailedState_ = "EventProcessor did not receive STOP event";
01581         fsm_.fireFailed(reasonForFailedState_,this);
01582         localLog(reasonForFailedState_);
01583       }
01584     if(hasShMem_) detachDqmFromShm();
01585   }
01586   catch (xcept::Exception &e) {
01587     reasonForFailedState_ = "stopping FAILED: " + (std::string)e.what();
01588     localLog(reasonForFailedState_);
01589     fsm_.fireFailed(reasonForFailedState_,this);
01590   }
01591   LOG4CPLUS_INFO(getApplicationLogger(),"Finished stopping!");
01592   localLog("-I- Stop completed");
01593   return false;
01594 }
01595 
01596 void FUEventProcessor::stopSlavesAndAcknowledge()
01597 {
01598   MsgBuf msg(0,MSQM_MESSAGE_TYPE_STOP);
01599   MsgBuf msg1(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_STOP);
01600 
01601   for(unsigned int i = 0; i < subs_.size(); i++)
01602     {
01603       pthread_mutex_lock(&stop_lock_);
01604       if(subs_[i].alive()>0)subs_[i].post(msg,false);
01605       pthread_mutex_unlock(&stop_lock_);
01606     }
01607   for(unsigned int i = 0; i < subs_.size(); i++)
01608     {
01609       pthread_mutex_lock(&stop_lock_);
01610       if(subs_[i].alive()>0){
01611         try{
01612           subs_[i].rcv(msg1,false);
01613         }
01614         catch(evf::Exception &e){
01615           std::ostringstream ost;
01616           ost << "failed to get STOP - errno ->" << errno << " " << e.what(); 
01617           reasonForFailedState_ = ost.str();
01618           LOG4CPLUS_WARN(getApplicationLogger(),reasonForFailedState_);
01619           fsm_.fireFailed(reasonForFailedState_,this);
01620           localLog(reasonForFailedState_);
01621           break;
01622         }
01623       }
01624       else {
01625         pthread_mutex_unlock(&stop_lock_);
01626         continue;
01627       }
01628       pthread_mutex_unlock(&stop_lock_);
01629       if(msg1->mtype==MSQS_MESSAGE_TYPE_STOP)
01630         while(subs_[i].alive()>0) ::usleep(10000);
01631       subs_[i].disconnect();
01632     }
01633   //  subs_.clear();
01634 
01635 }
01636 
01637 void FUEventProcessor::microState(xgi::Input *in,xgi::Output *out)
01638 {
01639   std::string urn = getApplicationDescriptor()->getURN();
01640   try{
01641     evtProcessor_.stateNameFromIndex(0);
01642     evtProcessor_.moduleNameFromIndex(0);
01643   if(myProcess_) {std::cout << "microstate called for child! bail out" << std::endl; return;}
01644   *out << "<tr><td>" << fsm_.stateName()->toString() 
01645        << "</td><td>"<< (myProcess_ ? "S" : "M") <<"</td><td>" << nblive_ << "</td><td>"
01646        << nbdead_ << "</td><td><a href=\"/" << urn << "/procStat\">" << getpid() <<"</a></td>";
01647   evtProcessor_.microState(in,out);
01648   *out << "<td>" << nbTotalDQM_ 
01649        << "</td><td>" << evtProcessor_.getScalersUpdates() << "</td></tr>";
01650   if(nbSubProcesses_.value_!=0 && !myProcess_) 
01651     {
01652       pthread_mutex_lock(&start_lock_);
01653       for(unsigned int i = 0; i < subs_.size(); i++)
01654         {
01655           try{
01656             if(subs_[i].alive()>0)
01657               {
01658                 *out << "<tr><td  bgcolor=\"#00FF00\" id=\"a"
01659                      << i << "\">""Alive</td><td>S</td><td>"
01660                      << subs_[i].queueId() << "<td>" 
01661                      << subs_[i].queueStatus()<< "/"
01662                      << subs_[i].queueOccupancy() << "/"
01663                      << subs_[i].queuePidOfLastSend() << "/"
01664                      << subs_[i].queuePidOfLastReceive() 
01665                      << "</td><td><a id=\"p"<< i << "\" href=\"SubWeb?process=" 
01666                      << subs_[i].pid() << "&method=procStat\">" 
01667                      << subs_[i].pid()<<"</a></td>" //<< msg->mtext;
01668                      << "<td>" << evtProcessor_.stateNameFromIndex(subs_[i].params().Ms) << "</td><td>" 
01669                      << evtProcessor_.moduleNameFromIndex(subs_[i].params().ms) << "</td><td>" 
01670                      << subs_[i].params().nba << "/" << subs_[i].params().nbp 
01671                      << " (" << float(subs_[i].params().nba)/float(subs_[i].params().nbp)*100. <<"%)" 
01672                      << "</td><td>" << subs_[i].params().ls  << "/" << subs_[i].params().ls 
01673                      << "</td><td>" << subs_[i].params().ps 
01674                      << "</td><td>" << subs_[i].params().dqm 
01675                      << "</td><td>" << subs_[i].params().trp << "</td>";
01676               }
01677             else 
01678               {
01679                 pthread_mutex_lock(&pickup_lock_);
01680                 *out << "<tr><td id=\"a"<< i << "\" ";
01681                 if(subs_[i].alive()==-1000)
01682                   *out << " bgcolor=\"#bbaabb\">NotInitialized";
01683                 else
01684                   *out << (subs_[i].alive()==0 ? ">Done" : " bgcolor=\"#FF0000\">Dead");
01685                 *out << "</td><td>S</td><td>"<< subs_[i].queueId() << "<td>" 
01686                      << subs_[i].queueStatus() << "/"
01687                      << subs_[i].queueOccupancy() << "/"
01688                      << subs_[i].queuePidOfLastSend() << "/"
01689                      << subs_[i].queuePidOfLastReceive() 
01690                      << "</td><td id=\"p"<< i << "\">"
01691                      <<subs_[i].pid()<<"</td><td colspan=\"5\">" << subs_[i].reasonForFailed();
01692                 if(subs_[i].alive()!=0 && subs_[i].alive()!=-1000) 
01693                   {
01694                     if(autoRestartSlaves_ && subs_[i].restartCount()<=2) 
01695                       *out << " will restart in " << subs_[i].countdown() << " s";
01696                     else if(autoRestartSlaves_)
01697                       *out << " reached maximum restart count";
01698                     else *out << " autoRestart is disabled ";
01699                   }
01700                 *out << "</td><td>" << subs_[i].params().dqm 
01701                      << "</td><td>" << subs_[i].params().trp << "</td>";
01702                 pthread_mutex_unlock(&pickup_lock_);
01703               }
01704             *out << "</tr>";
01705           }
01706           catch(evf::Exception &e){
01707             *out << "<tr><td id=\"a"<< i << "\" " 
01708                  <<"bgcolor=\"#FFFF00\">NotResp</td><td>S</td><td>"<< subs_[i].queueId() << "<td>" 
01709                  << subs_[i].queueStatus() << "/"
01710                  << subs_[i].queueOccupancy() << "/"
01711                  << subs_[i].queuePidOfLastSend() << "/"
01712                  << subs_[i].queuePidOfLastReceive() 
01713                  << "</td><td id=\"p"<< i << "\">"
01714                  <<subs_[i].pid()<<"</td>";
01715           }
01716         }
01717       pthread_mutex_unlock(&start_lock_); 
01718     }
01719   }
01720       catch(evf::Exception &e)
01721       {
01722         LOG4CPLUS_INFO(getApplicationLogger(),"evf::Exception caught in microstate - " << e.what());    
01723       }
01724     catch(cms::Exception &e)
01725       {
01726         LOG4CPLUS_INFO(getApplicationLogger(),"cms::Exception caught in microstate - " << e.what());    
01727       }
01728     catch(std::exception &e)
01729       {
01730         LOG4CPLUS_INFO(getApplicationLogger(),"std::Exception caught in microstate - " << e.what());    
01731       }
01732     catch(...)
01733       {
01734         LOG4CPLUS_INFO(getApplicationLogger(),"unknown exception caught in microstate - ");    
01735       }
01736 
01737 }
01738 
01739 
01740 void FUEventProcessor::updater(xgi::Input *in,xgi::Output *out)
01741 {
01742   using namespace utils;
01743 
01744   *out << updaterStatic_;
01745   mDiv(out,"loads");
01746   uptime(out);
01747   cDiv(out);
01748   mDiv(out,"st",fsm_.stateName()->toString());
01749   mDiv(out,"ru",runNumber_.toString());
01750   mDiv(out,"nsl",nbSubProcesses_.value_);
01751   mDiv(out,"cl");
01752   *out << getApplicationDescriptor()->getClassName() 
01753        << (nbSubProcesses_.value_ > 0 ? "MP " : " ");
01754   cDiv(out);
01755   mDiv(out,"in",getApplicationDescriptor()->getInstance());
01756   if(fsm_.stateName()->toString() != "Halted" && fsm_.stateName()->toString() != "halting"){
01757     mDiv(out,"hlt");
01758     *out << "<a href=\"" << configString_.toString() << "\">HLT Config</a>";
01759     cDiv(out);
01760     *out << std::endl;
01761   }
01762   else
01763     mDiv(out,"hlt","Not yet...");
01764 
01765   mDiv(out,"sq",squidPresent_.toString());
01766   mDiv(out,"vwl",(supervising_ ? "Active" : "Not Initialized"));
01767   mDiv(out,"mwl",evtProcessor_.wlMonitoring());
01768   if(nbProcessed != 0 && nbAccepted != 0)
01769     {
01770       mDiv(out,"tt",((xdata::UnsignedInteger32*)nbProcessed)->value_);
01771       mDiv(out,"ac",((xdata::UnsignedInteger32*)nbAccepted)->value_);
01772     }
01773   else
01774     {
01775       mDiv(out,"tt",0);
01776       mDiv(out,"ac",0);
01777     }
01778   if(!myProcess_)
01779     mDiv(out,"swl",(wlSummarizeActive_ ? "Active" : "Inactive"));
01780   else
01781     mDiv(out,"swl",(wlScalersActive_ ? "Active" : "Inactive"));
01782 
01783   mDiv(out,"idi",iDieUrl_.value_);
01784   if(vp_!=0){
01785     mDiv(out,"vpi",(unsigned int) vp_);
01786     if(vulture_->hasStarted()>=0)
01787       mDiv(out,"vul","Prowling");
01788     else
01789       mDiv(out,"vul","Dead");
01790   }
01791   else{
01792     mDiv(out,"vul",(vulture_==0 ? "Nope" : "Hatching"));
01793   }    
01794   if(evtProcessor_){
01795     mDiv(out,"ll");
01796     *out << evtProcessor_.lastLumi().ls
01797          << "," << evtProcessor_.lastLumi().proc << "," << evtProcessor_.lastLumi().acc;
01798     cDiv(out);
01799   }
01800   mDiv(out,"lg");
01801   for(unsigned int i = logRingIndex_; i<logRingSize_; i++)
01802     *out << logRing_[i] << std::endl;
01803   if(logWrap_)
01804     for(unsigned int i = 0; i<logRingIndex_; i++)
01805       *out << logRing_[i] << std::endl;
01806   cDiv(out);
01807   
01808 }
01809 
01810 void FUEventProcessor::procStat(xgi::Input *in, xgi::Output *out)
01811 {
01812   evf::utils::procStat(out);
01813 }
01814 
01815 void FUEventProcessor::sendMessageOverMonitorQueue(MsgBuf &buf)
01816 {
01817   if(myProcess_) myProcess_->postSlave(buf,true);
01818 }
01819 
01820 void FUEventProcessor::makeStaticInfo()
01821 {
01822   using namespace utils;
01823   std::ostringstream ost;
01824   mDiv(&ost,"ve");
01825   ost<< "$Revision: 1.124 $ (" << edm::getReleaseVersion() <<")";
01826   cDiv(&ost);
01827   mDiv(&ost,"ou",outPut_.toString());
01828   mDiv(&ost,"sh",hasShMem_.toString());
01829   mDiv(&ost,"mw",hasModuleWebRegistry_.toString());
01830   mDiv(&ost,"sw",hasServiceWebRegistry_.toString());
01831   
01832   xdata::Serializable *monsleep = 0;
01833   xdata::Serializable *lstimeout = 0;
01834   try{
01835     monsleep  = applicationInfoSpace_->find("monSleepSec");
01836     lstimeout = applicationInfoSpace_->find("lsTimeOut");
01837   }
01838   catch(xdata::exception::Exception e){
01839   }
01840   
01841   if(monsleep!=0)
01842     mDiv(&ost,"ms",monsleep->toString());
01843   if(lstimeout!=0)
01844     mDiv(&ost,"lst",lstimeout->toString());
01845   char cbuf[sizeof(struct utsname)];
01846   struct utsname* buf = (struct utsname*)cbuf;
01847   uname(buf);
01848   mDiv(&ost,"sysinfo");
01849   ost << buf->sysname << " " << buf->nodename 
01850       << " " << buf->release << " " << buf->version << " " << buf->machine;
01851   cDiv(&ost);
01852   updaterStatic_ = ost.str();
01853 }
01854 
01855 XDAQ_INSTANTIATOR_IMPL(evf::FUEventProcessor)