CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_5_3_0/src/EventFilter/Processor/src/FUEventProcessor.cc

Go to the documentation of this file.
00001 
00002 //
00003 // FUEventProcessor
00004 // ----------------
00005 //
00007 
00008 #include "FUEventProcessor.h"
00009 #include "procUtils.h"
00010 #include "EventFilter/Utilities/interface/CPUStat.h"
00011 #include "EventFilter/Utilities/interface/RateStat.h"
00012 
00013 #include "EventFilter/Utilities/interface/Exception.h"
00014 
00015 #include "EventFilter/Message2log4cplus/interface/MLlog4cplus.h"
00016 #include "EventFilter/Modules/interface/FUShmDQMOutputService.h"
00017 #include "EventFilter/Utilities/interface/ServiceWebRegistry.h"
00018 #include "EventFilter/Utilities/interface/ServiceWeb.h"
00019 #include "EventFilter/Utilities/interface/ModuleWeb.h"
00020 #include "EventFilter/Utilities/interface/ModuleWebRegistry.h"
00021 #include "EventFilter/Modules/src/FUShmOutputModule.h"
00022 
00023 #include "FWCore/PluginManager/interface/ProblemTracker.h"
00024 #include "FWCore/PluginManager/interface/PresenceFactory.h"
00025 #include "FWCore/Utilities/interface/Presence.h"
00026 #include "FWCore/Utilities/interface/Exception.h"
00027 #include "FWCore/Version/interface/GetReleaseVersion.h"
00028 #include "FWCore/ServiceRegistry/interface/Service.h"
00029 
00030 #include "toolbox/BSem.h" 
00031 
00032 #include <boost/tokenizer.hpp>
00033 
00034 #include "xcept/tools.h"
00035 #include "xgi/Method.h"
00036 
00037 #include "cgicc/CgiDefs.h"
00038 #include "cgicc/Cgicc.h"
00039 #include "cgicc/FormEntry.h"
00040 
00041 
00042 #include <sys/wait.h>
00043 #include <sys/utsname.h>
00044 
00045 #include <typeinfo>
00046 #include <stdlib.h>
00047 #include <stdio.h>
00048 #include <errno.h>
00049 
00050 using namespace evf;
00051 using namespace cgicc;
00052 namespace toolbox {
00053   namespace mem {
00054     extern toolbox::BSem * _s_mutex_ptr_; 
00055   }
00056 }
00057 
00059 // construction/destruction
00061 
00062 //______________________________________________________________________________
00063 FUEventProcessor::FUEventProcessor(xdaq::ApplicationStub *s) 
00064   : xdaq::Application(s)
00065   , fsm_(this)
00066   , log_(getApplicationLogger())
00067   , evtProcessor_(log_, getApplicationDescriptor()->getInstance())
00068   , runNumber_(0)
00069   , epInitialized_(false)
00070   , outPut_(true)
00071   , autoRestartSlaves_(false)
00072   , slaveRestartDelaySecs_(10)
00073   , hasShMem_(true)
00074   , hasPrescaleService_(true)
00075   , hasModuleWebRegistry_(true)
00076   , hasServiceWebRegistry_(true)
00077   , isRunNumberSetter_(true)
00078   , iDieStatisticsGathering_(false)
00079   , outprev_(true)
00080   , exitOnError_(true)
00081   , reasonForFailedState_()
00082   , squidnet_(3128,"https://localhost:8000/RELEASE-NOTES.txt")
00083   , logRing_(logRingSize_)
00084   , logRingIndex_(logRingSize_)
00085   , logWrap_(false)
00086   , nbSubProcesses_(0)
00087   , nbSubProcessesReporting_(0)
00088   , forkInEDM_(true)
00089   , nblive_(0)
00090   , nbdead_(0)
00091   , nbTotalDQM_(0)
00092   , wlReceiving_(0)
00093   , asReceiveMsgAndExecute_(0)
00094   , receiving_(false) 
00095   , wlReceivingMonitor_(0)
00096   , asReceiveMsgAndRead_(0)
00097   , receivingM_(false)
00098   , myProcess_(0)
00099   , wlSupervising_(0)
00100   , asSupervisor_(0)
00101   , supervising_(false)
00102   , monitorInfoSpace_(0)
00103   , monitorLegendaInfoSpace_(0)
00104   , applicationInfoSpace_(0)
00105   , nbProcessed(0)
00106   , nbAccepted(0)
00107   , scalersInfoSpace_(0)
00108   , scalersLegendaInfoSpace_(0)
00109   , wlScalers_(0)
00110   , asScalers_(0)
00111   , wlScalersActive_(false)
00112   , scalersUpdates_(0)
00113   , wlSummarize_(0)
00114   , asSummarize_(0)
00115   , wlSummarizeActive_(false)
00116   , superSleepSec_(1)
00117   , iDieUrl_("none")
00118   , vulture_(0)
00119   , vp_(0)
00120   , cpustat_(0)
00121   , ratestat_(0)
00122   , mwrRef_(nullptr)
00123   , sorRef_(nullptr)
00124   , master_message_prg_(0,MSQM_MESSAGE_TYPE_PRG)
00125   , master_message_prr_(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_PRR)
00126   , slave_message_prr_(sizeof(prg),MSQS_MESSAGE_TYPE_PRR)
00127   , master_message_trr_(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_TRR)
00128   , edm_init_done_(true)
00129 {
00130   using namespace utils;
00131 
00132   names_.push_back("nbProcessed"    );
00133   names_.push_back("nbAccepted"     );
00134   names_.push_back("epMacroStateInt");
00135   names_.push_back("epMicroStateInt");
00136   // create pipe for web communication
00137   int retpipe = pipe(anonymousPipe_);
00138   if(retpipe != 0)
00139         LOG4CPLUS_ERROR(getApplicationLogger(),"Failed to create pipe");
00140   // check squid presence
00141   squidPresent_ = squidnet_.check();
00142   //pass application parameters to FWEPWrapper
00143   evtProcessor_.setAppDesc(getApplicationDescriptor());
00144   evtProcessor_.setAppCtxt(getApplicationContext());
00145   // bind relevant callbacks to finite state machine
00146   fsm_.initialize<evf::FUEventProcessor>(this);
00147   
00148   //set sourceId_
00149   url_ =
00150     getApplicationDescriptor()->getContextDescriptor()->getURL()+"/"+
00151     getApplicationDescriptor()->getURN();
00152   class_   =getApplicationDescriptor()->getClassName();
00153   instance_=getApplicationDescriptor()->getInstance();
00154   sourceId_=class_.toString()+"_"+instance_.toString();
00155   LOG4CPLUS_INFO(getApplicationLogger(),sourceId_ <<" constructor"         );
00156   LOG4CPLUS_INFO(getApplicationLogger(),"CMSSW_BASE:"<<getenv("CMSSW_BASE"));
00157   
00158   getApplicationDescriptor()->setAttribute("icon", "/evf/images/epicon.jpg");
00159   
00160   xdata::InfoSpace *ispace = getApplicationInfoSpace();
00161   applicationInfoSpace_ = ispace;
00162 
00163   // default configuration
00164   ispace->fireItemAvailable("parameterSet",         &configString_                );
00165   ispace->fireItemAvailable("epInitialized",        &epInitialized_               );
00166   ispace->fireItemAvailable("stateName",             fsm_.stateName()             );
00167   ispace->fireItemAvailable("runNumber",            &runNumber_                   );
00168   ispace->fireItemAvailable("outputEnabled",        &outPut_                      );
00169 
00170   ispace->fireItemAvailable("hasSharedMemory",      &hasShMem_);
00171   ispace->fireItemAvailable("hasPrescaleService",   &hasPrescaleService_          );
00172   ispace->fireItemAvailable("hasModuleWebRegistry", &hasModuleWebRegistry_        );
00173   ispace->fireItemAvailable("hasServiceWebRegistry", &hasServiceWebRegistry_      );
00174   ispace->fireItemAvailable("isRunNumberSetter",    &isRunNumberSetter_           );
00175   ispace->fireItemAvailable("iDieStatisticsGathering",   &iDieStatisticsGathering_);
00176   ispace->fireItemAvailable("rcmsStateListener",     fsm_.rcmsStateListener()     );
00177   ispace->fireItemAvailable("foundRcmsStateListener",fsm_.foundRcmsStateListener());
00178   ispace->fireItemAvailable("nbSubProcesses",       &nbSubProcesses_              );
00179   ispace->fireItemAvailable("nbSubProcessesReporting",&nbSubProcessesReporting_   );
00180   ispace->fireItemAvailable("forkInEDM"             ,&forkInEDM_                  );
00181   ispace->fireItemAvailable("superSleepSec",        &superSleepSec_               );
00182   ispace->fireItemAvailable("autoRestartSlaves",    &autoRestartSlaves_           );
00183   ispace->fireItemAvailable("slaveRestartDelaySecs",&slaveRestartDelaySecs_       );
00184   ispace->fireItemAvailable("iDieUrl",              &iDieUrl_                     );
00185   
00186   // Add infospace listeners for exporting data values
00187   getApplicationInfoSpace()->addItemChangedListener("parameterSet",        this);
00188   getApplicationInfoSpace()->addItemChangedListener("outputEnabled",       this);
00189 
00190   // findRcmsStateListener
00191   fsm_.findRcmsStateListener();
00192   
00193   // initialize monitoring infospace
00194 
00195   std::string monInfoSpaceName="evf-eventprocessor-status-monitor";
00196   toolbox::net::URN urn = this->createQualifiedInfoSpace(monInfoSpaceName);
00197   monitorInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
00198 
00199   std::string monLegendaInfoSpaceName="evf-eventprocessor-status-legenda";
00200   urn = this->createQualifiedInfoSpace(monLegendaInfoSpaceName);
00201   monitorLegendaInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
00202 
00203   
00204   monitorInfoSpace_->fireItemAvailable("url",                      &url_            );
00205   monitorInfoSpace_->fireItemAvailable("class",                    &class_          );
00206   monitorInfoSpace_->fireItemAvailable("instance",                 &instance_       );
00207   monitorInfoSpace_->fireItemAvailable("runNumber",                &runNumber_      );
00208   monitorInfoSpace_->fireItemAvailable("stateName",                 fsm_.stateName()); 
00209 
00210   monitorInfoSpace_->fireItemAvailable("squidPresent",             &squidPresent_   );
00211 
00212   std::string scalersInfoSpaceName="evf-eventprocessor-scalers-monitor";
00213   urn = this->createQualifiedInfoSpace(scalersInfoSpaceName);
00214   scalersInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
00215 
00216   std::string scalersLegendaInfoSpaceName="evf-eventprocessor-scalers-legenda";
00217   urn = this->createQualifiedInfoSpace(scalersLegendaInfoSpaceName);
00218   scalersLegendaInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
00219 
00220 
00221 
00222   evtProcessor_.setScalersInfoSpace(scalersInfoSpace_,scalersLegendaInfoSpace_);
00223   scalersInfoSpace_->fireItemAvailable("instance", &instance_);
00224 
00225   evtProcessor_.setApplicationInfoSpace(ispace);
00226   evtProcessor_.setMonitorInfoSpace(monitorInfoSpace_,monitorLegendaInfoSpace_);
00227   evtProcessor_.publishConfigAndMonitorItems(nbSubProcesses_.value_!=0);
00228 
00229   //subprocess state vectors for MP
00230   monitorInfoSpace_->fireItemAvailable("epMacroStateInt",             &spMStates_); 
00231   monitorInfoSpace_->fireItemAvailable("epMicroStateInt",             &spmStates_); 
00232   
00233   // Bind web interface
00234   xgi::bind(this, &FUEventProcessor::css,              "styles.css");
00235   xgi::bind(this, &FUEventProcessor::defaultWebPage,   "Default"   );
00236   xgi::bind(this, &FUEventProcessor::spotlightWebPage, "Spotlight" );
00237   xgi::bind(this, &FUEventProcessor::scalersWeb,       "scalersWeb");
00238   xgi::bind(this, &FUEventProcessor::pathNames,        "pathNames" );
00239   xgi::bind(this, &FUEventProcessor::subWeb,           "SubWeb"    );
00240   xgi::bind(this, &FUEventProcessor::moduleWeb,        "moduleWeb" );
00241   xgi::bind(this, &FUEventProcessor::serviceWeb,       "serviceWeb");
00242   xgi::bind(this, &FUEventProcessor::microState,       "microState");
00243   xgi::bind(this, &FUEventProcessor::updater,          "updater"   );
00244   xgi::bind(this, &FUEventProcessor::procStat,         "procStat"  );
00245 
00246   // instantiate the plugin manager, not referenced here after!
00247 
00248   edm::AssertHandler ah;
00249 
00250   try{
00251     LOG4CPLUS_DEBUG(getApplicationLogger(),
00252                     "Trying to create message service presence ");
00253     edm::PresenceFactory *pf = edm::PresenceFactory::get();
00254     if(pf != 0) {
00255       pf->makePresence("MessageServicePresence").release();
00256     }
00257     else {
00258       LOG4CPLUS_ERROR(getApplicationLogger(),
00259                       "Unable to create message service presence ");
00260     }
00261   } 
00262   catch(...) {
00263     LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception");
00264   }
00265   ML::MLlog4cplus::setAppl(this);      
00266 
00267   typedef std::set<xdaq::ApplicationDescriptor*> AppDescSet_t;
00268   typedef AppDescSet_t::iterator                 AppDescIter_t;
00269   
00270   AppDescSet_t rcms=
00271     getApplicationContext()->getDefaultZone()->
00272     getApplicationDescriptors("RCMSStateListener");
00273   if(rcms.size()==0) 
00274     {
00275       LOG4CPLUS_WARN(getApplicationLogger(),
00276                        "MonitorReceiver not found, perhaphs it has not been defined ? Scalers updater wl will bail out!");
00277       //        localLog("-W- MonitorReceiver not found, perhaphs it has not been defined ? Scalers updater wl will bail out!");
00278     }
00279   else
00280     {
00281       AppDescIter_t it = rcms.begin();
00282       evtProcessor_.setRcms(*it);
00283     }
00284   pthread_mutex_init(&start_lock_,0);
00285   pthread_mutex_init(&stop_lock_,0);
00286   pthread_mutex_init(&pickup_lock_,0);
00287 
00288   forkInfoObj_=nullptr;
00289   pthread_mutex_init(&forkObjLock_,0);
00290   makeStaticInfo();
00291   startSupervisorLoop();  
00292 
00293   if(vulture_==0) vulture_ = new Vulture(true);
00294 
00296 
00297   AppDescSet_t setOfiDie=
00298     getApplicationContext()->getDefaultZone()->
00299     getApplicationDescriptors("evf::iDie");
00300   
00301   for (AppDescIter_t it=setOfiDie.begin();it!=setOfiDie.end();++it)
00302     if ((*it)->getInstance()==0) // there has to be only one instance of iDie
00303       iDieUrl_ = (*it)->getContextDescriptor()->getURL() + "/" + (*it)->getURN();
00304 }
00305 //___________here ends the *huge* constructor___________________________________
00306 
00307 
00308 //______________________________________________________________________________
00309 FUEventProcessor::~FUEventProcessor()
00310 {
00311   // no longer needed since the wrapper is a member of the class and one can rely on 
00312   // implicit destructor - to be revised - at any rate the most common exit path is via "kill"...
00313   //  if (evtProcessor_) delete evtProcessor_;
00314   if(vulture_ != 0) delete vulture_;
00315 }
00316 
00317 
00319 // implementation of member functions
00321 
00322 
00323 //______________________________________________________________________________
00324 bool FUEventProcessor::configuring(toolbox::task::WorkLoop* wl)
00325 {
00326 //   std::cout << "values " << ((nbSubProcesses_.value_!=0) ? 0x10 : 0) << " "
00327 //          << ((instance_.value_==0) ? 0x8 : 0) << " "
00328 //          << (hasServiceWebRegistry_.value_ ? 0x4 : 0) << " "
00329 //          << (hasModuleWebRegistry_.value_ ? 0x2 : 0) << " "
00330 //          << (hasPrescaleService_.value_ ? 0x1 : 0) <<std::endl;
00331   unsigned short smap 
00332     = ((nbSubProcesses_.value_!=0) ? 0x10 : 0)
00333     + (((instance_.value_%80)==0) ? 0x8 : 0) // have at least one legend per slice
00334     + (hasServiceWebRegistry_.value_ ? 0x4 : 0) 
00335     + (hasModuleWebRegistry_.value_ ? 0x2 : 0) 
00336     + (hasPrescaleService_.value_ ? 0x1 : 0);
00337   if(nbSubProcesses_.value_==0) 
00338     {
00339       spMStates_.setSize(1); 
00340       spmStates_.setSize(1); 
00341     }
00342   else
00343     {
00344       spMStates_.setSize(nbSubProcesses_.value_);
00345       spmStates_.setSize(nbSubProcesses_.value_);
00346       for(unsigned int i = 0; i < spMStates_.size(); i++)
00347         {
00348           spMStates_[i] = edm::event_processor::sInit; 
00349           spmStates_[i] = 0; 
00350         }
00351     }
00352   try {
00353     LOG4CPLUS_INFO(getApplicationLogger(),"Start configuring ...");
00354     std::string cfg = configString_.toString(); evtProcessor_.init(smap,cfg);
00355     epInitialized_=true;
00356     if(evtProcessor_)
00357       {
00358         //get ref of mwr
00359         mwrRef_ = evtProcessor_.getModuleWebRegistry();
00360         sorRef_ = evtProcessor_.getShmOutputModuleRegistry();
00361         // moved to wrapper class
00362         configuration_ = evtProcessor_.configuration();
00363         if(nbSubProcesses_.value_==0) evtProcessor_.startMonitoringWorkLoop(); 
00364         evtProcessor_->beginJob(); 
00365         if(cpustat_) {delete cpustat_; cpustat_=0;}
00366         cpustat_ = new CPUStat(evtProcessor_.getNumberOfMicrostates(),
00367                                nbSubProcesses_.value_,
00368                                instance_.value_,
00369                                iDieUrl_.value_);
00370         if(ratestat_) {delete ratestat_; ratestat_=0;}
00371         ratestat_ = new RateStat(iDieUrl_.value_);
00372         if(iDieStatisticsGathering_.value_){
00373           try{
00374             cpustat_->sendLegenda(evtProcessor_.getmicromap());
00375             xdata::Serializable *legenda = scalersLegendaInfoSpace_->find("scalersLegenda");
00376             if(legenda !=0){
00377               std::string slegenda = ((xdata::String*)legenda)->value_;
00378               ratestat_->sendLegenda(slegenda);
00379             }
00380 
00381           }
00382           catch(evf::Exception &e){
00383             LOG4CPLUS_INFO(getApplicationLogger(),"coud not send legenda"
00384                            << e.what());
00385           }
00386         }
00387         
00388         fsm_.fireEvent("ConfigureDone",this);
00389         LOG4CPLUS_INFO(getApplicationLogger(),"Finished configuring!");
00390         localLog("-I- Configuration completed");
00391 
00392       }
00393   }
00394   catch (xcept::Exception &e) {
00395     reasonForFailedState_ = "configuring FAILED: " + (std::string)e.what();
00396     fsm_.fireFailed(reasonForFailedState_,this);
00397     localLog(reasonForFailedState_);
00398   }
00399   catch(cms::Exception &e) {
00400     reasonForFailedState_ = e.explainSelf();
00401     fsm_.fireFailed(reasonForFailedState_,this);
00402     localLog(reasonForFailedState_);
00403   }    
00404   catch(std::exception &e) {
00405     reasonForFailedState_ = e.what();
00406     fsm_.fireFailed(reasonForFailedState_,this);
00407     localLog(reasonForFailedState_);
00408   }
00409   catch(...) {
00410     fsm_.fireFailed("Unknown Exception",this);
00411   }
00412 
00413   if(vulture_!=0 && vp_ == 0) vp_ = vulture_->makeProcess();
00414 
00415   return false;
00416 }
00417 
00418 
00419 
00420 
00421 //______________________________________________________________________________
00422 bool FUEventProcessor::enabling(toolbox::task::WorkLoop* wl)
00423 {
00424   nbTotalDQM_ = 0;
00425   scalersUpdates_ = 0;
00426 //   std::cout << "values " << ((nbSubProcesses_.value_!=0) ? 0x10 : 0) << " "
00427 //          << ((instance_.value_==0) ? 0x8 : 0) << " "
00428 //          << (hasServiceWebRegistry_.value_ ? 0x4 : 0) << " "
00429 //          << (hasModuleWebRegistry_.value_ ? 0x2 : 0) << " "
00430 //          << (hasPrescaleService_.value_ ? 0x1 : 0) <<std::endl;
00431   unsigned short smap 
00432     = ((nbSubProcesses_.value_!=0) ? 0x10 : 0)
00433     + (((instance_.value_%80)==0) ? 0x8 : 0) // have at least one legend per slice
00434     + (hasServiceWebRegistry_.value_ ? 0x4 : 0) 
00435     + (hasModuleWebRegistry_.value_ ? 0x2 : 0) 
00436     + (hasPrescaleService_.value_ ? 0x1 : 0);
00437 
00438   LOG4CPLUS_INFO(getApplicationLogger(),"Start enabling...");
00439   if(!epInitialized_){
00440     evtProcessor_.forceInitEventProcessorMaybe();
00441   }
00442   std::string cfg = configString_.toString(); evtProcessor_.init(smap,cfg);
00443   
00444   mwrRef_ = evtProcessor_.getModuleWebRegistry();
00445   sorRef_ = evtProcessor_.getShmOutputModuleRegistry();
00446 
00447   if(!epInitialized_){
00448     evtProcessor_->beginJob(); 
00449     if(cpustat_) {delete cpustat_; cpustat_=0;}
00450     cpustat_ = new CPUStat(evtProcessor_.getNumberOfMicrostates(),
00451                            nbSubProcesses_.value_,
00452                            instance_.value_,
00453                            iDieUrl_.value_);
00454     if(ratestat_) {delete ratestat_; ratestat_=0;}
00455     ratestat_ = new RateStat(iDieUrl_.value_);
00456     if(iDieStatisticsGathering_.value_){
00457       try
00458         {
00459         cpustat_->sendLegenda(evtProcessor_.getmicromap());
00460         xdata::Serializable *legenda = scalersLegendaInfoSpace_->find("scalersLegenda");
00461         if(legenda !=0)
00462           {
00463             std::string slegenda = ((xdata::String*)legenda)->value_;
00464             ratestat_->sendLegenda(slegenda);
00465           }
00466         }
00467       catch(evf::Exception &e)
00468         {
00469           LOG4CPLUS_INFO(getApplicationLogger(),"could not send legenda"
00470                          << e.what());
00471         }
00472       catch (xcept::Exception& e) {
00473         LOG4CPLUS_ERROR(getApplicationLogger(),"Failed to get or send legenda."
00474                         << e.what());
00475       }
00476     }
00477     epInitialized_ = true;
00478   }
00479   configuration_ = evtProcessor_.configuration(); // get it again after init has been carried out...
00480   evtProcessor_.resetLumiSectionReferenceIndex();
00481   //classic appl will return here 
00482   if(nbSubProcesses_.value_==0) return enableClassic();
00483   //protect manipulation of subprocess array
00484   pthread_mutex_lock(&start_lock_);
00485   pthread_mutex_lock(&pickup_lock_);
00486   subs_.clear();
00487   subs_.resize(nbSubProcesses_.value_); // this should not be necessary
00488   pid_t retval = -1;
00489 
00490   for(unsigned int i=0; i<nbSubProcesses_.value_; i++)
00491     {
00492       subs_[i]=SubProcess(i,retval); //this will replace all the scattered variables
00493     }
00494   pthread_mutex_unlock(&pickup_lock_);
00495 
00496   pthread_mutex_unlock(&start_lock_);
00497 
00498   //set expected number of child EP's for the Init message(s) sent to the SM
00499   try {
00500     if (sorRef_) {
00501       unsigned int nbExpectedEPWriters = nbSubProcesses_.value_;
00502       if (nbExpectedEPWriters==0) nbExpectedEPWriters=1;//master instance processing
00503       std::vector<edm::FUShmOutputModule *> shmOutputs = sorRef_->getShmOutputModules();
00504       for (unsigned int i=0;i<shmOutputs.size();i++) {
00505         shmOutputs[i]->setNExpectedEPs(nbExpectedEPWriters);
00506       }
00507     }
00508   }
00509   catch (...)
00510   {
00511     LOG4CPLUS_ERROR(getApplicationLogger(),"Thrown Exception while setting nExpectedEPs in shmOutputs");
00512   }
00513 
00514   //use new method if configured
00515   edm_init_done_=true;
00516   if (forkInEDM_.value_) {
00517     edm_init_done_=false;
00518     enableForkInEDM();
00519   }
00520   else
00521     for(unsigned int i=0; i<nbSubProcesses_.value_; i++)
00522     {
00523       retval = subs_[i].forkNew();
00524       if(retval==0)
00525         {
00526           myProcess_ = &subs_[i];
00527           // dirty hack: delete/recreate global binary semaphore for later use in child
00528           delete toolbox::mem::_s_mutex_ptr_;
00529           toolbox::mem::_s_mutex_ptr_ = new toolbox::BSem(toolbox::BSem::FULL,true);
00530           int retval = pthread_mutex_destroy(&stop_lock_);
00531           if(retval != 0) perror("error");
00532           retval = pthread_mutex_init(&stop_lock_,0);
00533           if(retval != 0) perror("error");
00534           fsm_.disableRcmsStateNotification();
00535           
00536           return enableMPEPSlave();
00537           // the loop is broken in the child 
00538         }
00539     }
00540 
00541   if (!edm_init_done_) {
00542     //enable while we wait for beginRun/conditions to load
00543     LOG4CPLUS_INFO(getApplicationLogger(),"Finished enabling!");
00544     fsm_.fireEvent("EnableDone",this);
00545     localLog("-I- Start completed");
00546 
00547     edm::event_processor::State st;
00548     while (!edm_init_done_) {
00549       usleep(10000);
00550       st = evtProcessor_->getState();
00551       if (st==edm::event_processor::sError || st==edm::event_processor::sInvalid) break;
00552     }
00553     st = evtProcessor_->getState();
00554     //error handling: EP must fork during sRunning
00555     if (st!=edm::event_processor::sRunning) {
00556       reasonForFailedState_ = std::string("Master edm::EventProcessor in state ") + evtProcessor_->stateName(st);
00557       localLog(reasonForFailedState_);
00558       fsm_.fireFailed(reasonForFailedState_,this);
00559       return false;
00560     }
00561     startSummarizeWorkLoop();
00562     vp_ = vulture_->start(iDieUrl_.value_,runNumber_.value_);
00563     return false;
00564   }
00565 
00566   startSummarizeWorkLoop();
00567   vp_ = vulture_->start(iDieUrl_.value_,runNumber_.value_);
00568   LOG4CPLUS_INFO(getApplicationLogger(),"Finished enabling!");
00569   fsm_.fireEvent("EnableDone",this);
00570   localLog("-I- Start completed");
00571   return false;
00572 }
00573 
00574 bool FUEventProcessor::doEndRunInEDM() {
00575   //taking care that EP in master is in stoppable state
00576   if (forkInfoObj_) {
00577 
00578     int count = 30;
00579     while (!edm_init_done_ && count) {
00580       ::sleep(1);
00581       if (count%5==0)
00582         LOG4CPLUS_WARN(log_,"MASTER EP: Stopping while EP busy in beginRun. waiting " <<count<< "sec");
00583       count--;
00584     }
00585     //if (count==0) fsm_.fireFailed("failed to stop Master EP",this);
00586 
00587     if (evtProcessor_->getState()==edm::event_processor::sJobReady)
00588       return true;//we are already done
00589 
00590     forkInfoObj_->lock();
00591     forkInfoObj_->stopCondition=true;
00592     sem_post(forkInfoObj_->control_sem_);
00593     forkInfoObj_->unlock();
00594 
00595     count = 30;
00596 
00597     edm::event_processor::State st;
00598     while(count--) {
00599       st = evtProcessor_->getState();
00600       if (st==edm::event_processor::sRunning) ::usleep(100000);
00601       else if (st==edm::event_processor::sStopping || st==edm::event_processor::sJobReady) {
00602         break;
00603       }
00604       else {
00605         LOG4CPLUS_ERROR(getApplicationLogger(),
00606             "Master edm::EventProcessor is in state "<< evtProcessor_->stateName(st) << " while stopping");
00607         return false;
00608       }
00609     }
00610     if (count<0) {
00611       LOG4CPLUS_ERROR(getApplicationLogger(),
00612           "Timeout waiting for Master edm::EventProcessor to go stopping state:"<<evtProcessor_->stateName(st));
00613       return false;
00614     }
00615   }
00616   return true;
00617 }
00618 
00619 //______________________________________________________________________________
00620 bool FUEventProcessor::stopping(toolbox::task::WorkLoop* wl)
00621 {
00622   if(nbSubProcesses_.value_!=0) {
00623     stopSlavesAndAcknowledge();
00624     if (forkInEDM_.value_) doEndRunInEDM();
00625   }
00626   vulture_->stop();
00627 
00628   if (forkInEDM_.value_) {
00629     bool tmpHasShMem_=hasShMem_;
00630     hasShMem_=false;
00631     bool stop_status = stopClassic();
00632     hasShMem_=tmpHasShMem_;
00633     return stop_status;
00634   }
00635   return stopClassic();
00636 }
00637 
00638 
00639 //______________________________________________________________________________
00640 bool FUEventProcessor::halting(toolbox::task::WorkLoop* wl)
00641 {
00642   LOG4CPLUS_INFO(getApplicationLogger(),"Start halting ...");
00643   if(nbSubProcesses_.value_!=0) { 
00644     stopSlavesAndAcknowledge();
00645     if (forkInEDM_.value_) doEndRunInEDM();
00646   }
00647   try{
00648     evtProcessor_.stopAndHalt();
00649     //cleanup forking variables
00650     if (forkInfoObj_) {
00651       delete forkInfoObj_;
00652       forkInfoObj_=0;
00653     }
00654   }
00655   catch (evf::Exception &e) {
00656     reasonForFailedState_ = "halting FAILED: " + (std::string)e.what();
00657     localLog(reasonForFailedState_);
00658     fsm_.fireFailed(reasonForFailedState_,this);
00659   }
00660   //  if(hasShMem_) detachDqmFromShm();
00661 
00662   LOG4CPLUS_INFO(getApplicationLogger(),"Finished halting!");
00663   fsm_.fireEvent("HaltDone",this);
00664 
00665   localLog("-I- Halt completed");
00666   return false;
00667 }
00668 
00669 
00670 //______________________________________________________________________________
00671 xoap::MessageReference FUEventProcessor::fsmCallback(xoap::MessageReference msg)
00672   throw (xoap::exception::Exception)
00673 {
00674   return fsm_.commandCallback(msg);
00675 }
00676 
00677 
00678 //______________________________________________________________________________
00679 void FUEventProcessor::actionPerformed(xdata::Event& e)
00680 {
00681 
00682   if (e.type()=="ItemChangedEvent" && fsm_.stateName()->toString()!="Halted") {
00683     std::string item = dynamic_cast<xdata::ItemChangedEvent&>(e).itemName();
00684     
00685     if ( item == "parameterSet") {
00686       LOG4CPLUS_WARN(getApplicationLogger(),
00687                      "HLT Menu changed, force reinitialization of EventProcessor");
00688       epInitialized_ = false;
00689     }
00690     if ( item == "outputEnabled") {
00691       if(outprev_ != outPut_) {
00692         LOG4CPLUS_WARN(getApplicationLogger(),
00693                        (outprev_ ? "Disabling " : "Enabling ")<<"global output");
00694         evtProcessor_->enableEndPaths(outPut_);
00695         outprev_ = outPut_;
00696       }
00697     }
00698     if (item == "globalInputPrescale") {
00699       LOG4CPLUS_WARN(getApplicationLogger(),
00700                      "Setting global input prescale has no effect "
00701                      <<"in this version of the code");
00702     }
00703     if ( item == "globalOutputPrescale") {
00704       LOG4CPLUS_WARN(getApplicationLogger(),
00705                      "Setting global output prescale has no effect "
00706                      <<"in this version of the code");
00707     }
00708   }
00709   
00710 }
00711 
00712 //______________________________________________________________________________
00713 void FUEventProcessor::subWeb(xgi::Input  *in, xgi::Output *out)
00714 {
00715   using namespace cgicc;
00716   pid_t pid = 0;
00717   std::ostringstream ost;
00718   ost << "&";
00719 
00720   Cgicc cgi(in);
00721   internal::MyCgi *mycgi = (internal::MyCgi*)in;
00722   for(std::map<std::string, std::string, std::less<std::string> >::iterator mit = 
00723         mycgi->getEnvironment().begin();
00724       mit != mycgi->getEnvironment().end(); mit++)
00725     ost << mit->first << "%" << mit->second << ";";
00726   std::vector<FormEntry> els = cgi.getElements() ;
00727   std::vector<FormEntry> el1;
00728   cgi.getElement("method",el1);
00729   std::vector<FormEntry> el2;
00730   cgi.getElement("process",el2);
00731   if(el1.size()!=0) {
00732     std::string meth = el1[0].getValue();
00733     if(el2.size()!=0) {
00734       unsigned int i = 0;
00735       std::string mod = el2[0].getValue();
00736       pid = atoi(mod.c_str()); // get the process id to be polled
00737       for(; i < subs_.size(); i++)
00738         if(subs_[i].pid()==pid) break;
00739       if(i>=subs_.size()){ //process was not found, let the browser know
00740         *out << "ERROR 404 : Process " << pid << " Not Found !" << std::endl;
00741         return;
00742       } 
00743       if(subs_[i].alive() != 1){
00744         *out << "ERROR 405 : Process " << pid << " Not Alive !" << std::endl;
00745         return;
00746       }
00747       MsgBuf msg1(meth.length()+ost.str().length()+1,MSQM_MESSAGE_TYPE_WEB);
00748       strncpy(msg1->mtext,meth.c_str(),meth.length());
00749       strncpy(msg1->mtext+meth.length(),ost.str().c_str(),ost.str().length());
00750       subs_[i].post(msg1,true);
00751       unsigned int keep_supersleep_original_value = superSleepSec_.value_;
00752       superSleepSec_.value_=10*keep_supersleep_original_value;
00753       MsgBuf msg(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_WEB);
00754       bool done = false;
00755       std::vector<char *>pieces;
00756       while(!done){
00757         unsigned long retval1 = subs_[i].rcvNonBlocking(msg,true);
00758         if(retval1 == MSGQ_MESSAGE_TYPE_RANGE){
00759 	  ::sleep(1);
00760           continue;
00761         }
00762         unsigned int nbytes = atoi(msg->mtext);
00763         if(nbytes < MAX_PIPE_BUFFER_SIZE) done = true; // this will break the while loop
00764         char *buf= new char[nbytes];
00765         ssize_t retval = read(anonymousPipe_[PIPE_READ],buf,nbytes);
00766         if(retval!=nbytes) std::cout 
00767           << "CAREFUL HERE, read less bytes than expected from pipe in subWeb" << std::endl;
00768         pieces.push_back(buf);
00769       }
00770       superSleepSec_.value_=keep_supersleep_original_value;
00771       for(unsigned int j = 0; j < pieces.size(); j++){
00772         *out<<pieces[j];    // chain the buffers into the output strstream
00773         delete[] pieces[j]; //make sure to release all buffers used for reading the pipe
00774       }
00775     }
00776   }
00777 }
00778 
00779 
00780 //______________________________________________________________________________
00781 void FUEventProcessor::defaultWebPage(xgi::Input  *in, xgi::Output *out)
00782   throw (xgi::exception::Exception)
00783 {
00784 
00785 
00786   *out << "<!DOCTYPE HTML PUBLIC \"-//W3C//DTD HTML 4.0 Transitional//EN\">" 
00787        << "<html><head><title>" << getApplicationDescriptor()->getClassName() << (nbSubProcesses_.value_ > 0 ? "MP " : " ") 
00788        << getApplicationDescriptor()->getInstance() << "</title>"
00789        << "<meta https-equiv=\"REFRESH\" content=\"0;url=/evf/html/defaultBasePage.html\">"
00790        << "</head></html>";
00791 }
00792 
00793 
00794 //______________________________________________________________________________
00795 
00796 
00797 void FUEventProcessor::spotlightWebPage(xgi::Input  *in, xgi::Output *out)
00798   throw (xgi::exception::Exception)
00799 {
00800 
00801   std::string urn = getApplicationDescriptor()->getURN();
00802 
00803   *out << "<!-- base href=\"/" <<  urn
00804        << "\"> -->" << std::endl;
00805   *out << "<html>"                                                   << std::endl;
00806   *out << "<head>"                                                   << std::endl;
00807   *out << "<link type=\"text/css\" rel=\"stylesheet\"";
00808   *out << " href=\"/evf/html/styles.css\"/>"                   << std::endl;
00809   *out << "<title>" << getApplicationDescriptor()->getClassName() 
00810        << getApplicationDescriptor()->getInstance() 
00811        << " MAIN</title>"     << std::endl;
00812   *out << "</head>"                                                  << std::endl;
00813   *out << "<body>"                                                   << std::endl;
00814   *out << "<table border=\"0\" width=\"100%\">"                      << std::endl;
00815   *out << "<tr>"                                                     << std::endl;
00816   *out << "  <td align=\"left\">"                                    << std::endl;
00817   *out << "    <img"                                                 << std::endl;
00818   *out << "     align=\"middle\""                                    << std::endl;
00819   *out << "     src=\"/evf/images/spoticon.jpg\""                            << std::endl;
00820   *out << "     alt=\"main\""                                        << std::endl;
00821   *out << "     width=\"64\""                                        << std::endl;
00822   *out << "     height=\"64\""                                       << std::endl;
00823   *out << "     border=\"\"/>"                                       << std::endl;
00824   *out << "    <b>"                                                  << std::endl;
00825   *out << getApplicationDescriptor()->getClassName() 
00826        << getApplicationDescriptor()->getInstance()                  << std::endl;
00827   *out << "      " << fsm_.stateName()->toString()                   << std::endl;
00828   *out << "    </b>"                                                 << std::endl;
00829   *out << "  </td>"                                                  << std::endl;
00830   *out << "  <td width=\"32\">"                                      << std::endl;
00831   *out << "    <a href=\"/urn:xdaq-application:lid=3\">"             << std::endl;
00832   *out << "      <img"                                               << std::endl;
00833   *out << "       align=\"middle\""                                  << std::endl;
00834   *out << "       src=\"/hyperdaq/images/HyperDAQ.jpg\""             << std::endl;
00835   *out << "       alt=\"HyperDAQ\""                                  << std::endl;
00836   *out << "       width=\"32\""                                      << std::endl;
00837   *out << "       height=\"32\""                                     << std::endl;
00838   *out << "       border=\"\"/>"                                     << std::endl;
00839   *out << "    </a>"                                                 << std::endl;
00840   *out << "  </td>"                                                  << std::endl;
00841   *out << "  <td width=\"32\">"                                      << std::endl;
00842   *out << "  </td>"                                                  << std::endl;
00843   *out << "  <td width=\"32\">"                                      << std::endl;
00844   *out << "    <a href=\"/" << urn << "/\">"                         << std::endl;
00845   *out << "      <img"                                               << std::endl;
00846   *out << "       align=\"middle\""                                  << std::endl;
00847   *out << "       src=\"/evf/images/epicon.jpg\""                    << std::endl;
00848   *out << "       alt=\"main\""                                      << std::endl;
00849   *out << "       width=\"32\""                                      << std::endl;
00850   *out << "       height=\"32\""                                     << std::endl;
00851   *out << "       border=\"\"/>"                                     << std::endl;
00852   *out << "    </a>"                                                 << std::endl;
00853   *out << "  </td>"                                                  << std::endl;
00854   *out << "</tr>"                                                    << std::endl;
00855   *out << "</table>"                                                 << std::endl;
00856 
00857   *out << "<hr/>"                                                    << std::endl;
00858   
00859   std::ostringstream ost;
00860   if(myProcess_) 
00861     ost << "/SubWeb?process=" << getpid() << "&method=moduleWeb&";
00862   else
00863     ost << "/moduleWeb?";
00864   urn += ost.str();
00865   if(evtProcessor_ && (myProcess_ || nbSubProcesses_.value_==0))
00866     evtProcessor_.taskWebPage(in,out,urn);
00867   else if(evtProcessor_)
00868     evtProcessor_.summaryWebPage(in,out,urn);
00869   else
00870     *out << "<td>HLT Unconfigured</td>" << std::endl;
00871   *out << "</table>"                                                 << std::endl;
00872   
00873   *out << "<br><textarea rows=" << 10 << " cols=80 scroll=yes>"      << std::endl;
00874   *out << configuration_                                             << std::endl;
00875   *out << "</textarea><P>"                                           << std::endl;
00876   
00877   *out << "</body>"                                                  << std::endl;
00878   *out << "</html>"                                                  << std::endl;
00879 
00880 
00881 }
00882 void FUEventProcessor::scalersWeb(xgi::Input  *in, xgi::Output *out)
00883   throw (xgi::exception::Exception)
00884 {
00885 
00886   out->getHTTPResponseHeader().addHeader( "Content-Type",
00887                                           "application/octet-stream" );
00888   out->getHTTPResponseHeader().addHeader( "Content-Transfer-Encoding",
00889                                           "binary" );
00890   if(evtProcessor_ != 0){
00891     out->write( (char*)(evtProcessor_.getPackedTriggerReportAsStruct()), sizeof(TriggerReportStatic) );
00892   }
00893 }
00894 
00895 void FUEventProcessor::pathNames(xgi::Input  *in, xgi::Output *out)
00896   throw (xgi::exception::Exception)
00897 {
00898 
00899   if(evtProcessor_ != 0){
00900     xdata::Serializable *legenda = scalersLegendaInfoSpace_->find("scalersLegenda");
00901     if(legenda !=0){
00902       std::string slegenda = ((xdata::String*)legenda)->value_;
00903       *out << slegenda << std::endl;
00904     }
00905   }
00906 }
00907 
00908 
00909 void FUEventProcessor::setAttachDqmToShm() throw (evf::Exception)  
00910 {
00911   std::string errmsg;
00912   try {
00913     edm::ServiceRegistry::Operate operate(evtProcessor_->getToken());
00914     if(edm::Service<FUShmDQMOutputService>().isAvailable())
00915       edm::Service<FUShmDQMOutputService>()->setAttachToShm();
00916   }
00917   catch (cms::Exception& e) {
00918     errmsg = "Failed to set to attach DQM service to shared memory: " + (std::string)e.what();
00919   }
00920   if (!errmsg.empty()) XCEPT_RAISE(evf::Exception,errmsg);
00921 }
00922 
00923 
00924 void FUEventProcessor::attachDqmToShm() throw (evf::Exception)  
00925 {
00926   std::string errmsg;
00927   bool success = false;
00928   try {
00929     edm::ServiceRegistry::Operate operate(evtProcessor_->getToken());
00930     if(edm::Service<FUShmDQMOutputService>().isAvailable())
00931       success = edm::Service<FUShmDQMOutputService>()->attachToShm();
00932     if (!success) errmsg = "Failed to attach DQM service to shared memory";
00933   }
00934   catch (cms::Exception& e) {
00935     errmsg = "Failed to attach DQM service to shared memory: " + (std::string)e.what();
00936   }
00937   if (!errmsg.empty()) XCEPT_RAISE(evf::Exception,errmsg);
00938 }
00939 
00940 
00941 
00942 void FUEventProcessor::detachDqmFromShm() throw (evf::Exception)
00943 {
00944   std::string errmsg;
00945   bool success = false;
00946   try {
00947     edm::ServiceRegistry::Operate operate(evtProcessor_->getToken());
00948     if(edm::Service<FUShmDQMOutputService>().isAvailable())
00949       success = edm::Service<FUShmDQMOutputService>()->detachFromShm();
00950     if (!success) errmsg = "Failed to detach DQM service from shared memory";
00951   }
00952   catch (cms::Exception& e) {
00953     errmsg = "Failed to detach DQM service from shared memory: " + (std::string)e.what();
00954   }
00955   if (!errmsg.empty()) XCEPT_RAISE(evf::Exception,errmsg);
00956 }
00957 
00958 
00959 std::string FUEventProcessor::logsAsString()
00960 {
00961   std::ostringstream oss;
00962   if(logWrap_)
00963     {
00964       for(unsigned int i = logRingIndex_; i < logRing_.size(); i++)
00965         oss << logRing_[i] << std::endl;
00966       for(unsigned int i = 0; i <  logRingIndex_; i++)
00967         oss << logRing_[i] << std::endl;
00968     }
00969   else
00970       for(unsigned int i = logRingIndex_; i < logRing_.size(); i++)
00971         oss << logRing_[i] << std::endl;
00972     
00973   return oss.str();
00974 }
00975   
00976 void FUEventProcessor::localLog(std::string m)
00977 {
00978   timeval tv;
00979 
00980   gettimeofday(&tv,0);
00981   tm *uptm = localtime(&tv.tv_sec);
00982   char datestring[256];
00983   strftime(datestring, sizeof(datestring),"%c", uptm);
00984 
00985   if(logRingIndex_ == 0){logWrap_ = true; logRingIndex_ = logRingSize_;}
00986   logRingIndex_--;
00987   std::ostringstream timestamp;
00988   timestamp << " at " << datestring;
00989   m += timestamp.str();
00990   logRing_[logRingIndex_] = m;
00991 }
00992 
00993 void FUEventProcessor::startSupervisorLoop()
00994 {
00995   try {
00996     wlSupervising_=
00997       toolbox::task::getWorkLoopFactory()->getWorkLoop("Supervisor",
00998                                                        "waiting");
00999     if (!wlSupervising_->isActive()) wlSupervising_->activate();
01000     asSupervisor_ = toolbox::task::bind(this,&FUEventProcessor::supervisor,
01001                                         "Supervisor");
01002     wlSupervising_->submit(asSupervisor_);
01003     supervising_ = true;
01004   }
01005   catch (xcept::Exception& e) {
01006     std::string msg = "Failed to start workloop 'Supervisor'.";
01007     XCEPT_RETHROW(evf::Exception,msg,e);
01008   }
01009 }
01010 
01011 void FUEventProcessor::startReceivingLoop()
01012 {
01013   try {
01014     wlReceiving_=
01015       toolbox::task::getWorkLoopFactory()->getWorkLoop("Receiving",
01016                                                        "waiting");
01017     if (!wlReceiving_->isActive()) wlReceiving_->activate();
01018     asReceiveMsgAndExecute_ = toolbox::task::bind(this,&FUEventProcessor::receiving,
01019                                         "Receiving");
01020     wlReceiving_->submit(asReceiveMsgAndExecute_);
01021     receiving_ = true;
01022   }
01023   catch (xcept::Exception& e) {
01024     std::string msg = "Failed to start workloop 'Receiving'.";
01025     XCEPT_RETHROW(evf::Exception,msg,e);
01026   }
01027 }
01028 void FUEventProcessor::startReceivingMonitorLoop()
01029 {
01030   try {
01031     wlReceivingMonitor_=
01032       toolbox::task::getWorkLoopFactory()->getWorkLoop("ReceivingM",
01033                                                        "waiting");
01034     if (!wlReceivingMonitor_->isActive()) 
01035       wlReceivingMonitor_->activate();
01036     asReceiveMsgAndRead_ = 
01037       toolbox::task::bind(this,&FUEventProcessor::receivingAndMonitor,
01038                           "ReceivingM");
01039     wlReceivingMonitor_->submit(asReceiveMsgAndRead_);
01040     receivingM_ = true;
01041   }
01042   catch (xcept::Exception& e) {
01043     std::string msg = "Failed to start workloop 'ReceivingM'.";
01044     XCEPT_RETHROW(evf::Exception,msg,e);
01045   }
01046 }
01047 
01048 bool FUEventProcessor::receiving(toolbox::task::WorkLoop *)
01049 {
01050   MsgBuf msg;
01051   try{
01052     myProcess_->rcvSlave(msg,false); //will receive only messages from Master
01053     if(msg->mtype==MSQM_MESSAGE_TYPE_STOP)
01054       {
01055         pthread_mutex_lock(&stop_lock_);
01056         fsm_.fireEvent("Stop",this); // need to set state in fsm first to allow stopDone transition
01057         try{
01058           LOG4CPLUS_DEBUG(getApplicationLogger(),
01059                           "Trying to create message service presence ");
01060           edm::PresenceFactory *pf = edm::PresenceFactory::get();
01061           if(pf != 0) {
01062             pf->makePresence("MessageServicePresence").release();
01063           }
01064           else {
01065             LOG4CPLUS_ERROR(getApplicationLogger(),
01066                             "Unable to create message service presence ");
01067           }
01068         } 
01069         catch(...) {
01070           LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception");
01071         }
01072         stopClassic(); // call the normal sequence of stopping - as this is allowed to fail provisions must be made ...@@@EM
01073         MsgBuf msg1(0,MSQS_MESSAGE_TYPE_STOP);
01074         myProcess_->postSlave(msg1,false);
01075         pthread_mutex_unlock(&stop_lock_);
01076         fclose(stdout);
01077         fclose(stderr);
01078         _exit(EXIT_SUCCESS);
01079       }
01080     if(msg->mtype==MSQM_MESSAGE_TYPE_FSTOP)
01081       _exit(EXIT_SUCCESS);
01082   }
01083   catch(evf::Exception &e){}
01084   return true;
01085 }
01086 
01087 bool FUEventProcessor::supervisor(toolbox::task::WorkLoop *)
01088 {
01089   pthread_mutex_lock(&stop_lock_);
01090   if(subs_.size()!=nbSubProcesses_.value_)
01091     {
01092       pthread_mutex_lock(&pickup_lock_);
01093       if(subs_.size()!=nbSubProcesses_.value_) {
01094         subs_.resize(nbSubProcesses_.value_);
01095         spMStates_.resize(nbSubProcesses_.value_);
01096         spmStates_.resize(nbSubProcesses_.value_);
01097         for(unsigned int i = 0; i < spMStates_.size(); i++)
01098           {
01099             spMStates_[i] = edm::event_processor::sInit; 
01100             spmStates_[i] = 0; 
01101           }
01102       }
01103       pthread_mutex_unlock(&pickup_lock_);
01104     }
01105   bool running = fsm_.stateName()->toString()=="Enabled";
01106   bool stopping = fsm_.stateName()->toString()=="stopping";
01107   for(unsigned int i = 0; i < subs_.size(); i++)
01108     {
01109       if(subs_[i].alive()==-1000) continue;
01110       int sl;
01111       pid_t sub_pid = subs_[i].pid();
01112       pid_t killedOrNot = waitpid(sub_pid,&sl,WNOHANG);
01113 
01114       if(killedOrNot && killedOrNot==sub_pid) {
01115         pthread_mutex_lock(&pickup_lock_);
01116         //check if out of range or recreated (enable can clear vector)
01117         if (i<subs_.size() && subs_[i].alive()!=-1000) {
01118           subs_[i].setStatus((WIFEXITED(sl) != 0 ? 0 : -1));
01119           std::ostringstream ost;
01120           if(subs_[i].alive()==0) ost << " process exited with status " << WEXITSTATUS(sl);
01121           else if(WIFSIGNALED(sl)!=0) ost << " process terminated with signal " << WTERMSIG(sl);
01122           else ost << " process stopped ";
01123           subs_[i].countdown()=slaveRestartDelaySecs_.value_;
01124           subs_[i].setReasonForFailed(ost.str());
01125           spMStates_[i] = evtProcessor_.notstarted_state_code();
01126           spmStates_[i] = 0;
01127           std::ostringstream ost1;
01128           ost1 << "-E- Slave " << subs_[i].pid() << ost.str();
01129           localLog(ost1.str());
01130           if(!autoRestartSlaves_.value_) subs_[i].disconnect();
01131         }
01132         pthread_mutex_unlock(&pickup_lock_);
01133       }
01134     }
01135   pthread_mutex_unlock(&stop_lock_);    
01136   if(stopping) return true; // if in stopping we are done
01137 
01138   if(running && edm_init_done_)
01139     {
01140       // if enabled, this loop will periodically check if dead slaves countdown has expired and restart them
01141       // this is only active while running, hence, the stop lock is acquired and only released at end of loop
01142       if(autoRestartSlaves_.value_){
01143         pthread_mutex_lock(&stop_lock_); //lockout slave killing at stop while you check for restarts
01144         for(unsigned int i = 0; i < subs_.size(); i++)
01145           {
01146             if(subs_[i].alive() != 1){
01147               if(subs_[i].countdown() == 0)
01148                 {
01149                   if(subs_[i].restartCount()>2){
01150                     LOG4CPLUS_WARN(getApplicationLogger()," Not restarting subprocess in slot " << i 
01151                                    << " - maximum restart count reached");
01152                     std::ostringstream ost1;
01153                     ost1 << "-W- Dead Process in slot " << i << " reached maximum restart count"; 
01154                     localLog(ost1.str());
01155                     subs_[i].countdown()--;
01156                     XCEPT_DECLARE(evf::Exception,
01157                                   sentinelException, ost1.str());
01158                     notifyQualified("error",sentinelException);
01159                     subs_[i].disconnect();
01160                     continue;
01161                   }
01162                   subs_[i].restartCount()++;
01163                   if (forkInEDM_.value_) {
01164                     restartForkInEDM(i);
01165                   }
01166                   else {
01167                     pid_t rr = subs_[i].forkNew();
01168                     if(rr==0)
01169                     {
01170                       myProcess_=&subs_[i];
01171                       scalersUpdates_ = 0;
01172                       int retval = pthread_mutex_destroy(&stop_lock_);
01173                       if(retval != 0) perror("error");
01174                       retval = pthread_mutex_init(&stop_lock_,0);
01175                       if(retval != 0) perror("error");
01176                       fsm_.disableRcmsStateNotification();
01177                       fsm_.fireEvent("Stop",this); // need to set state in fsm first to allow stopDone transition
01178                       fsm_.fireEvent("StopDone",this); // need to set state in fsm first to allow stopDone transition
01179                       fsm_.fireEvent("Enable",this); // need to set state in fsm first to allow stopDone transition
01180                       try{
01181                         xdata::Serializable *lsid = applicationInfoSpace_->find("lumiSectionIndex");
01182                         if(lsid) {
01183                           ((xdata::UnsignedInteger32*)(lsid))->value_--; // need to reset to value before end of ls in which process died
01184                         }
01185                       }
01186                       catch(...){
01187                         std::cout << "trouble with lsindex during restart" << std::endl;
01188                       }
01189                       try{
01190                         xdata::Serializable *lstb = applicationInfoSpace_->find("lsToBeRecovered");
01191                         if(lstb) {
01192                           ((xdata::Boolean*)(lstb))->value_ = false; // do not issue eol/bol for all Ls when restarting
01193                         }
01194                       }
01195                       catch(...){
01196                         std::cout << "trouble with resetting flag for eol recovery " << std::endl;
01197                       }
01198 
01199                       evtProcessor_.adjustLsIndexForRestart();
01200                       evtProcessor_.resetPackedTriggerReport();
01201                       enableMPEPSlave();
01202                       return false; // exit the supervisor loop immediately in the child !!!
01203                     }
01204                   else
01205                     {
01206                       std::ostringstream ost1;
01207                       ost1 << "-I- New Process " << rr << " forked for slot " << i; 
01208                       localLog(ost1.str());
01209                     }
01210                   }
01211                 }
01212               if(subs_[i].countdown()>=0) subs_[i].countdown()--;
01213             }
01214           }
01215         pthread_mutex_unlock(&stop_lock_);
01216       } // finished handling replacement of dead slaves once they've been reaped
01217     }
01218   xdata::Serializable *lsid = 0; 
01219   xdata::Serializable *psid = 0;
01220   xdata::Serializable *dqmp = 0;
01221   xdata::UnsignedInteger32 *dqm = 0;
01222 
01223 
01224   
01225   if(running && edm_init_done_){  
01226     try{
01227       lsid = applicationInfoSpace_->find("lumiSectionIndex");
01228       psid = applicationInfoSpace_->find("prescaleSetIndex");
01229       nbProcessed = monitorInfoSpace_->find("nbProcessed");
01230       nbAccepted  = monitorInfoSpace_->find("nbAccepted");
01231       dqmp = applicationInfoSpace_-> find("nbDqmUpdates");      
01232     }
01233     catch(xdata::exception::Exception e){
01234       LOG4CPLUS_INFO(getApplicationLogger(),"could not retrieve some data - " << e.what());    
01235     }
01236 
01237     try{
01238       if(nbProcessed !=0 && nbAccepted !=0)
01239         {
01240           xdata::UnsignedInteger32*nbp = ((xdata::UnsignedInteger32*)nbProcessed);
01241           xdata::UnsignedInteger32*nba = ((xdata::UnsignedInteger32*)nbAccepted);
01242           xdata::UnsignedInteger32*ls  = ((xdata::UnsignedInteger32*)lsid);
01243           xdata::UnsignedInteger32*ps  = ((xdata::UnsignedInteger32*)psid);
01244           if(dqmp!=0)
01245             dqm = (xdata::UnsignedInteger32*)dqmp;
01246           if(dqm) dqm->value_ = 0;
01247           nbTotalDQM_ = 0;
01248           nbp->value_ = 0;
01249           nba->value_ = 0;
01250           nblive_ = 0;
01251           nbdead_ = 0;
01252           scalersUpdates_ = 0;
01253 
01254           for(unsigned int i = 0; i < subs_.size(); i++)
01255             {
01256               if(subs_[i].alive()>0)
01257                 {
01258                   nblive_++;
01259                   try{
01260                     subs_[i].post(master_message_prg_,true);
01261                     
01262                     unsigned long retval = subs_[i].rcvNonBlocking(master_message_prr_,true);
01263                     if(retval == (unsigned long) master_message_prr_->mtype){
01264                       prg* p = (struct prg*)(master_message_prr_->mtext);
01265                       subs_[i].setParams(p);
01266                       spMStates_[i] = p->Ms;
01267                       spmStates_[i] = p->ms;
01268                       cpustat_->addEntry(p->ms);
01269                       if(!subs_[i].inInconsistentState() && 
01270                          (p->Ms == edm::event_processor::sError 
01271                           || p->Ms == edm::event_processor::sInvalid
01272                           || p->Ms == edm::event_processor::sStopping))
01273                         {
01274                           std::ostringstream ost;
01275                           ost << "edm::eventprocessor slot " << i << " process id " 
01276                               << subs_[i].pid() << " not in Running state : Mstate=" 
01277                               << evtProcessor_.stateNameFromIndex(p->Ms) << " mstate="
01278                               << evtProcessor_.moduleNameFromIndex(p->ms) 
01279                               << " - Look into possible error messages from HLT process";
01280                           LOG4CPLUS_WARN(getApplicationLogger(),ost.str());
01281                         }
01282                       nbp->value_ += subs_[i].params().nbp;
01283                       nba->value_  += subs_[i].params().nba;
01284                       if(dqm)dqm->value_ += p->dqm;
01285                       nbTotalDQM_ +=  p->dqm;
01286                       scalersUpdates_ += p->trp;
01287                       if(p->ls > ls->value_) ls->value_ = p->ls;
01288                       if(p->ps != ps->value_) ps->value_ = p->ps;
01289                     }
01290                     else{
01291                       nbp->value_ += subs_[i].get_save_nbp();
01292                       nba->value_ += subs_[i].get_save_nba();
01293                     }
01294                   } 
01295                   catch(evf::Exception &e){
01296                     LOG4CPLUS_INFO(getApplicationLogger(),
01297                                    "could not send/receive msg on slot " 
01298                                    << i << " - " << e.what());    
01299                   }
01300                     
01301                 }
01302               else
01303                 {
01304                   nbp->value_ += subs_[i].get_save_nbp();
01305                   nba->value_ += subs_[i].get_save_nba();
01306                   nbdead_++;
01307                 }
01308             }
01309           if(nbp->value_>64){//have some slaves already processed more than one event ? (eventually make this == number of raw cells)
01310             for(unsigned int i = 0; i < subs_.size(); i++)
01311               {
01312                 if(subs_[i].params().nbp == 0){ // a slave has processed 0 events 
01313                   // check that the process is not stuck
01314                   if(subs_[i].alive()>0 && subs_[i].params().ms == 0) // the process is seen alive but in us=Invalid(0)
01315                     {
01316                       subs_[i].found_invalid();//increase the "found_invalid" counter
01317                       if(subs_[i].nfound_invalid() > 60){ //wait x monitor cycles (~1 min a good time ?) before doing something about a stuck slave
01318                         MsgBuf msg3(NUMERIC_MESSAGE_SIZE,MSQM_MESSAGE_TYPE_FSTOP);      // send a force-stop signal             
01319                         subs_[i].post(msg3,false);
01320                         std::ostringstream ost1;
01321                         ost1 << "-W- Process in slot " << i << " Never reached the running state - forcestopping it"; 
01322                         localLog(ost1.str());
01323                         LOG4CPLUS_ERROR(getApplicationLogger(),ost1.str());    
01324                         XCEPT_DECLARE(evf::Exception,
01325                                       sentinelException, ost1.str());
01326                         notifyQualified("error",sentinelException);
01327 
01328                       }
01329                     }
01330                 }
01331               }
01332           }
01333         }
01334     }
01335     catch(std::exception &e){
01336       LOG4CPLUS_INFO(getApplicationLogger(),"std exception - " << e.what());    
01337     }
01338     catch(...){
01339       LOG4CPLUS_INFO(getApplicationLogger(),"unknown exception ");    
01340     }
01341   }
01342   else{
01343     for(unsigned int i = 0; i < subs_.size(); i++)
01344       {
01345         if(subs_[i].alive()==-1000)
01346           {
01347             spMStates_[i] = edm::event_processor::sInit;
01348             spmStates_[i] = 0;
01349           }
01350       }
01351   }
01352   try{
01353     monitorInfoSpace_->lock();
01354     monitorInfoSpace_->fireItemGroupChanged(names_,0);
01355     monitorInfoSpace_->unlock();
01356   }
01357   catch(xdata::exception::Exception &e)
01358     {
01359       LOG4CPLUS_ERROR(log_, "Exception from fireItemGroupChanged: " << e.what());
01360       //        localLog(e.what());
01361     }
01362   ::sleep(superSleepSec_.value_);       
01363   return true;
01364 }
01365 
01366 void FUEventProcessor::startScalersWorkLoop() throw (evf::Exception)
01367 {
01368   try {
01369     wlScalers_=
01370       toolbox::task::getWorkLoopFactory()->getWorkLoop("Scalers",
01371                                                        "waiting");
01372     if (!wlScalers_->isActive()) wlScalers_->activate();
01373     asScalers_ = toolbox::task::bind(this,&FUEventProcessor::scalers,
01374                                      "Scalers");
01375     
01376   wlScalers_->submit(asScalers_);
01377   wlScalersActive_ = true;
01378   }
01379   catch (xcept::Exception& e) {
01380     std::string msg = "Failed to start workloop 'Scalers'.";
01381     XCEPT_RETHROW(evf::Exception,msg,e);
01382   }
01383 }
01384 
01385 //______________________________________________________________________________
01386 
01387 void FUEventProcessor::startSummarizeWorkLoop() throw (evf::Exception)
01388 {
01389   try {
01390     wlSummarize_=
01391       toolbox::task::getWorkLoopFactory()->getWorkLoop("Summary",
01392                                                        "waiting");
01393     if (!wlSummarize_->isActive()) wlSummarize_->activate();
01394     
01395     asSummarize_ = toolbox::task::bind(this,&FUEventProcessor::summarize,
01396                                        "Summary");
01397 
01398     wlSummarize_->submit(asSummarize_);
01399     wlSummarizeActive_ = true;
01400   }
01401   catch (xcept::Exception& e) {
01402     std::string msg = "Failed to start workloop 'Summarize'.";
01403     XCEPT_RETHROW(evf::Exception,msg,e);
01404   }
01405 }
01406 
01407 //______________________________________________________________________________
01408 
01409 bool FUEventProcessor::scalers(toolbox::task::WorkLoop* wl)
01410 {
01411   if(evtProcessor_)
01412     {
01413       if(!evtProcessor_.getTriggerReport(true)) {
01414         wlScalersActive_ = false;
01415         return false;
01416       }
01417     }
01418   else
01419     {
01420       std::cout << getpid()<< " Scalers workloop, bailing out, no evtProcessor " << std::endl;
01421       wlScalersActive_ = false;
01422       return false;
01423     }
01424   if(myProcess_) 
01425     {
01426       //      std::cout << getpid() << "going to post on control queue from scalers" << std::endl;
01427       int ret = myProcess_->postSlave(evtProcessor_.getPackedTriggerReport(),false);
01428       if(ret!=0)      std::cout << "scalers workloop, error posting to sqs_ " << errno << std::endl;
01429       scalersUpdates_++;
01430     }
01431   else
01432     evtProcessor_.fireScalersUpdate();
01433   return true;
01434 }
01435 
01436 //______________________________________________________________________________
01437 bool FUEventProcessor::summarize(toolbox::task::WorkLoop* wl)
01438 {
01439   evtProcessor_.resetPackedTriggerReport();
01440   bool atLeastOneProcessUpdatedSuccessfully = false;
01441   int msgCount = 0;
01442   for (unsigned int i = 0; i < subs_.size(); i++)
01443     {
01444       if(subs_[i].alive()>0)
01445         {
01446           int ret = 0;
01447           if(subs_[i].check_postponed_trigger_update(master_message_trr_,
01448                                                      evtProcessor_.getLumiSectionReferenceIndex()))
01449             {
01450               ret = MSQS_MESSAGE_TYPE_TRR;
01451               std::cout << "using postponed report from slot " << i << " for ls " << evtProcessor_.getLumiSectionReferenceIndex() << std::endl;
01452             }
01453           else{
01454             bool insync = false;
01455             bool exception_caught = false;
01456             while(!insync){
01457               try{
01458                 ret = subs_[i].rcv(master_message_trr_,false);
01459               }
01460               catch(evf::Exception &e)
01461                 {
01462                   std::cout << "exception in msgrcv on " << i 
01463                             << " " << subs_[i].alive() << " " << strerror(errno) << std::endl;
01464                   exception_caught = true;
01465                   break;
01466                   //do nothing special
01467                 }
01468               if(ret==MSQS_MESSAGE_TYPE_TRR) {
01469                 TriggerReportStatic *trp = (TriggerReportStatic *)master_message_trr_->mtext;
01470                 if(trp->lumiSection >= evtProcessor_.getLumiSectionReferenceIndex()){
01471                   insync = true;
01472                 }
01473               }
01474             }
01475             if(exception_caught) continue;
01476           }
01477           msgCount++;
01478           if(ret==MSQS_MESSAGE_TYPE_TRR) {
01479             TriggerReportStatic *trp = (TriggerReportStatic *)master_message_trr_->mtext;
01480             if(trp->lumiSection > evtProcessor_.getLumiSectionReferenceIndex()){
01481               std::cout << "postpone handling of msg from slot " << i << " with Ls " <<  trp->lumiSection
01482                         << " should be " << evtProcessor_.getLumiSectionReferenceIndex() << std::endl;
01483               subs_[i].add_postponed_trigger_update(master_message_trr_);
01484             }else{
01485               atLeastOneProcessUpdatedSuccessfully = true;
01486               evtProcessor_.sumAndPackTriggerReport(master_message_trr_);
01487             }
01488           }
01489           else std::cout << "msgrcv returned error " << errno << std::endl;
01490         }
01491     }
01492   if(atLeastOneProcessUpdatedSuccessfully){
01493     nbSubProcessesReporting_.value_ = msgCount;
01494     evtProcessor_.getPackedTriggerReportAsStruct()->nbExpected = nbSubProcesses_.value_;
01495     evtProcessor_.getPackedTriggerReportAsStruct()->nbReporting = nbSubProcessesReporting_.value_;
01496     evtProcessor_.updateRollingReport();
01497     evtProcessor_.fireScalersUpdate();
01498   }
01499   else{
01500     LOG4CPLUS_WARN(getApplicationLogger(),"Summarize loop: no process updated successfully - sleep 10 seconds before trying again");          
01501     if(msgCount==0) evtProcessor_.withdrawLumiSectionIncrement();
01502     nbSubProcessesReporting_.value_ = 0;
01503     ::sleep(10);
01504   }
01505   if(fsm_.stateName()->toString()!="Enabled"){
01506     wlScalersActive_ = false;
01507     return false;
01508   }
01509   //  cpustat_->printStat();
01510   if(iDieStatisticsGathering_.value_){
01511     try{
01512       TriggerReportStatic *trsp = evtProcessor_.getPackedTriggerReportAsStruct();
01513       cpustat_ ->setNproc(trsp->eventSummary.totalEvents);
01514       cpustat_ ->sendStat(evtProcessor_.getLumiSectionReferenceIndex());
01515       ratestat_->sendStat((unsigned char*)trsp,
01516                           sizeof(TriggerReportStatic),
01517                           evtProcessor_.getLumiSectionReferenceIndex());
01518     }catch(evf::Exception &e){
01519       LOG4CPLUS_INFO(getApplicationLogger(),"coud not send statistics"
01520                      << e.what());
01521     }
01522   }
01523   cpustat_->reset();
01524   return true;
01525 }
01526 
01527 
01528 
01529 bool FUEventProcessor::receivingAndMonitor(toolbox::task::WorkLoop *)
01530 {
01531   try{
01532     myProcess_->rcvSlave(slave_message_monitoring_,true); //will receive only messages from Master
01533     switch(slave_message_monitoring_->mtype)
01534       {
01535       case MSQM_MESSAGE_TYPE_MCS:
01536         {
01537           xgi::Input *in = 0;
01538           xgi::Output out;
01539           evtProcessor_.microState(in,&out);
01540           MsgBuf msg1(out.str().size(),MSQS_MESSAGE_TYPE_MCR);
01541           strncpy(msg1->mtext,out.str().c_str(),out.str().size());
01542           myProcess_->postSlave(msg1,true);
01543           break;
01544         }
01545       
01546       case MSQM_MESSAGE_TYPE_PRG:
01547         {
01548           xdata::Serializable *dqmp = 0;
01549           xdata::UnsignedInteger32 *dqm = 0;
01550           evtProcessor_.monitoring(0);
01551           try{
01552             dqmp = applicationInfoSpace_-> find("nbDqmUpdates");
01553           }  catch(xdata::exception::Exception e){}
01554           if(dqmp!=0)
01555             dqm = (xdata::UnsignedInteger32*)dqmp;
01556 
01557           //      monitorInfoSpace_->lock();  
01558           prg * data           = (prg*)slave_message_prr_->mtext;
01559           data->ls             = evtProcessor_.lsid_;
01560           data->eols           = evtProcessor_.lastLumiUsingEol_;
01561           data->ps             = evtProcessor_.psid_;
01562           data->nbp            = evtProcessor_->totalEvents();
01563           data->nba            = evtProcessor_->totalEventsPassed();
01564           data->Ms             = evtProcessor_.epMAltState_.value_;
01565           data->ms             = evtProcessor_.epmAltState_.value_;
01566           if(dqm) data->dqm    = dqm->value_; else data->dqm = 0;
01567           data->trp            = scalersUpdates_;
01568           //      monitorInfoSpace_->unlock();  
01569           myProcess_->postSlave(slave_message_prr_,true);
01570           if(exitOnError_.value_)
01571           { 
01572             // after each monitoring cycle check if we are in inconsistent state and exit if configured to do so  
01573             //      std::cout << getpid() << "receivingAndMonitor: trying to acquire stop lock " << std::endl;
01574             if(data->Ms == edm::event_processor::sStopping || data->Ms == edm::event_processor::sError) 
01575               { 
01576                 bool running = true;
01577                 int count = 0;
01578                 while(running){
01579                   int retval = pthread_mutex_lock(&stop_lock_);
01580                   if(retval != 0) perror("error");
01581                   running = fsm_.stateName()->toString()=="Enabled";
01582                   if(count>5) _exit(-1);
01583                   pthread_mutex_unlock(&stop_lock_);
01584                   if(running) {::sleep(1); count++;}
01585                 }
01586               }
01587             
01588           }
01589           //      scalersUpdates_++;
01590           break;
01591         }
01592       case MSQM_MESSAGE_TYPE_WEB:
01593         {
01594           xgi::Input  *in = 0;
01595           xgi::Output out;
01596           unsigned int bytesToSend = 0;
01597           MsgBuf msg1(NUMERIC_MESSAGE_SIZE,MSQS_MESSAGE_TYPE_WEB);
01598           std::string query = slave_message_monitoring_->mtext;
01599           size_t pos = query.find_first_of("&");
01600           std::string method;
01601           std::string args;
01602           if(pos!=std::string::npos)  
01603             {
01604               method = query.substr(0,pos);
01605               args = query.substr(pos+1,query.length()-pos-1);
01606             }
01607           else
01608             method=query;
01609 
01610           if(method=="Spotlight")
01611             {
01612               spotlightWebPage(in,&out);
01613             }
01614           else if(method=="procStat")
01615             {
01616               procStat(in,&out);
01617             }
01618           else if(method=="moduleWeb")
01619             {
01620               internal::MyCgi mycgi;
01621               boost::char_separator<char> sep(";");
01622               boost::tokenizer<boost::char_separator<char> > tokens(args, sep);
01623               for (boost::tokenizer<boost::char_separator<char> >::iterator tok_iter = tokens.begin();
01624                    tok_iter != tokens.end(); ++tok_iter){
01625                 size_t pos = (*tok_iter).find_first_of("%");
01626                 if(pos != std::string::npos){
01627                   std::string first  = (*tok_iter).substr(0    ,                        pos);
01628                   std::string second = (*tok_iter).substr(pos+1, (*tok_iter).length()-pos-1);
01629                   mycgi.getEnvironment()[first]=second;
01630                 }
01631               }
01632               moduleWeb(&mycgi,&out);
01633             }
01634           else if(method=="Default")
01635             {
01636               defaultWebPage(in,&out);
01637             }
01638           else 
01639             {
01640               out << "Error 404!!!!!!!!" << std::endl;
01641             }
01642 
01643 
01644           bytesToSend = out.str().size();
01645           unsigned int cycle = 0;
01646           if(bytesToSend==0)
01647             {
01648               snprintf(msg1->mtext, NUMERIC_MESSAGE_SIZE, "%d", bytesToSend);
01649               myProcess_->postSlave(msg1,true);
01650             }
01651           while(bytesToSend !=0){
01652             unsigned int msgSize = bytesToSend>MAX_PIPE_BUFFER_SIZE ? MAX_PIPE_BUFFER_SIZE : bytesToSend;
01653             write(anonymousPipe_[PIPE_WRITE],
01654                   out.str().c_str()+MAX_PIPE_BUFFER_SIZE*cycle,
01655                   msgSize);
01656             snprintf(msg1->mtext, NUMERIC_MESSAGE_SIZE, "%d", msgSize);
01657             myProcess_->postSlave(msg1,true);
01658             bytesToSend -= msgSize;
01659             cycle++;
01660           }
01661           break;
01662         }
01663       case MSQM_MESSAGE_TYPE_TRP:
01664         {
01665           break;
01666         }
01667       }
01668   }
01669   catch(evf::Exception &e){std::cout << "exception caught in recevingM: " << e.what() << std::endl;}
01670   return true;
01671 }
01672 
01673 
01674 bool FUEventProcessor::enableCommon()
01675 {
01676   try {    
01677     if(hasShMem_) attachDqmToShm();
01678     int sc = 0;
01679     evtProcessor_->clearCounters();
01680     if(isRunNumberSetter_)
01681       evtProcessor_->setRunNumber(runNumber_.value_);
01682     else
01683       evtProcessor_->declareRunNumber(runNumber_.value_);
01684 
01685     try{
01686       ::sleep(1);
01687       evtProcessor_->runAsync();
01688       sc = evtProcessor_->statusAsync();
01689     }
01690     catch(cms::Exception &e) {
01691       reasonForFailedState_ = e.explainSelf();
01692       fsm_.fireFailed(reasonForFailedState_,this);
01693       localLog(reasonForFailedState_);
01694       return false;
01695     }    
01696     catch(std::exception &e) {
01697       reasonForFailedState_  = e.what();
01698       fsm_.fireFailed(reasonForFailedState_,this);
01699       localLog(reasonForFailedState_);
01700       return false;
01701     }
01702     catch(...) {
01703       reasonForFailedState_ = "Unknown Exception";
01704       fsm_.fireFailed(reasonForFailedState_,this);
01705       localLog(reasonForFailedState_);
01706       return false;
01707     }
01708     if(sc != 0) {
01709       std::ostringstream oss;
01710       oss<<"EventProcessor::runAsync returned status code " << sc;
01711       reasonForFailedState_ = oss.str();
01712       fsm_.fireFailed(reasonForFailedState_,this);
01713       localLog(reasonForFailedState_);
01714       return false;
01715     }
01716   }
01717   catch (xcept::Exception &e) {
01718     reasonForFailedState_ = "enabling FAILED: " + (std::string)e.what();
01719     fsm_.fireFailed(reasonForFailedState_,this);
01720     localLog(reasonForFailedState_);
01721     return false;
01722   }
01723   try{
01724     fsm_.fireEvent("EnableDone",this);
01725   }
01726   catch (xcept::Exception &e) {
01727     std::cout << "exception " << (std::string)e.what() << std::endl;
01728     throw;
01729   }
01730 
01731   return false;
01732 }
01733 
01734 void FUEventProcessor::forkProcessFromEDM_helper(void * addr) {
01735   ((FUEventProcessor*)addr)->forkProcessesFromEDM();
01736 }
01737 
01738 void FUEventProcessor::forkProcessesFromEDM() {
01739 
01740   moduleweb::ForkParams * forkParams = &(forkInfoObj_->forkParams);
01741   unsigned int forkFrom=0;
01742   unsigned int forkTo=nbSubProcesses_.value_;
01743   if (forkParams->slotId>=0) {
01744     forkFrom=forkParams->slotId;
01745     forkTo=forkParams->slotId+1;
01746   }
01747 
01748   //before fork, make sure to disconnect output modules from Shm
01749   try {
01750     if (sorRef_) {
01751       std::vector<edm::FUShmOutputModule *> shmOutputs = sorRef_->getShmOutputModules();
01752       for (unsigned int i=0;i<shmOutputs.size();i++) {
01753         //unregister PID from ShmBuffer/RB
01754         shmOutputs[i]->unregisterFromShm();
01755         //disconnect from Shm
01756         shmOutputs[i]->stop();
01757       }
01758     }
01759   }
01760   catch (std::exception &e)
01761   {
01762     reasonForFailedState_ =  (std::string)"Thrown exception while disconnecting ShmOutputModule from Shm: " + e.what();
01763     LOG4CPLUS_ERROR(getApplicationLogger(),reasonForFailedState_);
01764     fsm_.fireFailed(reasonForFailedState_,this);
01765     localLog(reasonForFailedState_);
01766   }
01767   catch (...) {
01768     reasonForFailedState_ =  "Thrown unknown exception while disconnecting ShmOutputModule from Shm: ";
01769     LOG4CPLUS_ERROR(getApplicationLogger(),reasonForFailedState_);
01770     fsm_.fireFailed(reasonForFailedState_,this);
01771     localLog(reasonForFailedState_);
01772   }
01773 
01774   //fork loop
01775   for(unsigned int i=forkFrom; i<forkTo; i++)
01776   {
01777 
01778     int retval = subs_[i].forkNew();
01779     if(retval==0)
01780     {
01781 
01782       forkParams->isMaster=0;
01783       myProcess_ = &subs_[i];
01784       // dirty hack: delete/recreate global binary semaphore for later use in child
01785       delete toolbox::mem::_s_mutex_ptr_;
01786       toolbox::mem::_s_mutex_ptr_ = new toolbox::BSem(toolbox::BSem::FULL,true);
01787       int retval = pthread_mutex_destroy(&stop_lock_);
01788       if(retval != 0) perror("error");
01789       retval = pthread_mutex_init(&stop_lock_,0);
01790       if(retval != 0) perror("error");
01791       fsm_.disableRcmsStateNotification();
01792 
01793       try{
01794         LOG4CPLUS_DEBUG(getApplicationLogger(),
01795             "Trying to create message service presence ");
01796         //release the presense factory in master
01797         edm::PresenceFactory *pf = edm::PresenceFactory::get();
01798         if(pf != 0) {
01799           pf->makePresence("MessageServicePresence").release();
01800         }
01801         else {
01802           LOG4CPLUS_ERROR(getApplicationLogger(),
01803               "Unable to create message service presence ");
01804         }
01805       }
01806       catch(...) {
01807         LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception in MessageServicePresence");
01808       }
01809 
01810       ML::MLlog4cplus::setAppl(this);
01811 
01812       //reconnect to Shm from output modules
01813       try {
01814         if (sorRef_) {
01815           std::vector<edm::FUShmOutputModule *> shmOutputs = sorRef_->getShmOutputModules();
01816           for (unsigned int i=0;i<shmOutputs.size();i++)
01817             shmOutputs[i]->start();
01818         }
01819       }
01820       catch (...)
01821       {
01822         LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception (ShmOutputModule sending InitMsg (pid:"<<getpid() <<")");
01823       }
01824 
01825       if (forkParams->restart) {
01826         //do restart things
01827         scalersUpdates_ = 0;
01828         fsm_.fireEvent("Stop",this); // need to set state in fsm first to allow stopDone transition
01829         fsm_.fireEvent("StopDone",this); // need to set state in fsm first to allow stopDone transition
01830         fsm_.fireEvent("Enable",this); // need to set state in fsm first to allow stopDone transition
01831         try{
01832           xdata::Serializable *lsid = applicationInfoSpace_->find("lumiSectionIndex");
01833           if(lsid) {
01834             ((xdata::UnsignedInteger32*)(lsid))->value_--; // need to reset to value before end of ls in which process died
01835           }
01836         }
01837         catch(...){
01838           std::cout << "trouble with lsindex during restart" << std::endl;
01839         }
01840         try{
01841           xdata::Serializable *lstb = applicationInfoSpace_->find("lsToBeRecovered");
01842           if(lstb) {
01843             ((xdata::Boolean*)(lstb))->value_ = false; // do not issue eol/bol for all Ls when restarting
01844           }
01845         }
01846         catch(...){
01847           std::cout << "trouble with resetting flag for eol recovery " << std::endl;
01848         }
01849         evtProcessor_.adjustLsIndexForRestart();
01850         evtProcessor_.resetPackedTriggerReport();
01851       }
01852 
01853       //start other threads
01854       startReceivingLoop();
01855       startReceivingMonitorLoop();
01856       startScalersWorkLoop();
01857       while(!evtProcessor_.isWaitingForLs())
01858         ::usleep(100000);//wait for scalers loop to start
01859 
01860       //connect DQMShmOutputModule
01861       if(hasShMem_) attachDqmToShm();
01862 
01863       //catch transition error if we are already Enabled
01864       try {
01865         fsm_.fireEvent("EnableDone",this);
01866       }
01867       catch (...) {}
01868       //child return to DaqSource
01869       return ;
01870     }
01871     else {
01872       forkParams->isMaster=1;
01873       forkInfoObj_->forkParams.slotId=-1;
01874       forkInfoObj_->forkParams.restart=0;
01875       if (forkParams->restart) {
01876         std::ostringstream ost1;
01877         ost1 << "-I- New Process " << retval << " forked for slot " << forkParams->slotId;
01878         localLog(ost1.str());
01879       }
01880     }
01881   }
01882   edm_init_done_=true;
01883 }
01884 
01885 bool FUEventProcessor::enableForkInEDM() 
01886 {
01887   evtProcessor_.resetWaiting();
01888   try {
01889     //set to connect to Shm later
01890     //if(hasShMem_) setAttachDqmToShm();
01891 
01892     int sc = 0;
01893     //maybe not needed in MP mode
01894     evtProcessor_->clearCounters();
01895     if(isRunNumberSetter_)
01896       evtProcessor_->setRunNumber(runNumber_.value_);
01897     else
01898       evtProcessor_->declareRunNumber(runNumber_.value_);
01899 
01900     //prepare object used to communicate with DaqSource
01901     pthread_mutex_destroy(&forkObjLock_);
01902     pthread_mutex_init(&forkObjLock_,0);
01903     if (forkInfoObj_) delete forkInfoObj_;
01904     forkInfoObj_ = new moduleweb::ForkInfoObj();
01905     forkInfoObj_->mst_lock_=&forkObjLock_;
01906     forkInfoObj_->fuAddr=(void*)this;
01907     forkInfoObj_->forkHandler = forkProcessFromEDM_helper;
01908     forkInfoObj_->forkParams.slotId=-1;
01909     forkInfoObj_->forkParams.restart=0;
01910     forkInfoObj_->forkParams.isMaster=-1;
01911     forkInfoObj_->stopCondition=0;
01912     if (mwrRef_)
01913       mwrRef_->publishForkInfo(std::string("DaqSource"),forkInfoObj_);
01914 
01915     evtProcessor_->runAsync();
01916     sc = evtProcessor_->statusAsync();
01917 
01918     if(sc != 0) {
01919       std::ostringstream oss;
01920       oss<<"EventProcessor::runAsync returned status code " << sc;
01921       reasonForFailedState_ = oss.str();
01922       fsm_.fireFailed(reasonForFailedState_,this);
01923       LOG4CPLUS_FATAL(log_,reasonForFailedState_);
01924       return false;
01925     }
01926   }
01927   //catch exceptions on master side
01928   catch(cms::Exception &e) {
01929     reasonForFailedState_ = e.explainSelf();
01930     fsm_.fireFailed(reasonForFailedState_,this);
01931     LOG4CPLUS_FATAL(log_,reasonForFailedState_);
01932     return false;
01933   }    
01934   catch(std::exception &e) {
01935     reasonForFailedState_  = e.what();
01936     fsm_.fireFailed(reasonForFailedState_,this);
01937     LOG4CPLUS_FATAL(log_,reasonForFailedState_);
01938     return false;
01939   }
01940   catch(...) {
01941     reasonForFailedState_ = "Unknown Exception";
01942     fsm_.fireFailed(reasonForFailedState_,this);
01943     LOG4CPLUS_FATAL(log_,reasonForFailedState_);
01944     return false;
01945   }
01946   return true;
01947 }
01948 
01949 bool FUEventProcessor::restartForkInEDM(unsigned int slotId) {
01950   //daqsource will keep this lock until master returns after fork
01951   //so that we don't do another EP restart in between
01952   forkInfoObj_->lock();
01953   forkInfoObj_->forkParams.slotId=slotId;
01954   forkInfoObj_->forkParams.restart=true;
01955   forkInfoObj_->forkParams.isMaster=1;
01956   forkInfoObj_->stopCondition=0;
01957   LOG4CPLUS_DEBUG(log_, " restarting subprocess in slot "<< slotId <<": posting on control semaphore");
01958   sem_post(forkInfoObj_->control_sem_);
01959   forkInfoObj_->unlock();
01960   usleep(1000);
01961   return true;
01962 }
01963 
01964 bool FUEventProcessor::enableClassic()
01965 {
01966   bool retval = enableCommon();
01967   while(evtProcessor_->getState()!= edm::event_processor::sRunning){
01968     LOG4CPLUS_INFO(getApplicationLogger(),"waiting for edm::EventProcessor to start before enabling watchdog");
01969     ::sleep(1);
01970   }
01971   
01972   //  implementation moved to EPWrapper
01973   //  startScalersWorkLoop(); // this is now not done any longer 
01974   localLog("-I- Start completed");
01975   return retval;
01976 }
01977 bool FUEventProcessor::enableMPEPSlave()
01978 {
01979   //all this happens only in the child process
01980 
01981   startReceivingLoop();
01982   startReceivingMonitorLoop();
01983   evtProcessor_.resetWaiting();
01984   startScalersWorkLoop();
01985   while(!evtProcessor_.isWaitingForLs())
01986     ::sleep(1);
01987 
01988   // @EM test do not run monitor loop in slave, only receiving&Monitor
01989   //  evtProcessor_.startMonitoringWorkLoop();
01990   try{
01991     //    evtProcessor_.makeServicesOnly();
01992     try{
01993       LOG4CPLUS_DEBUG(getApplicationLogger(),
01994                       "Trying to create message service presence ");
01995       edm::PresenceFactory *pf = edm::PresenceFactory::get();
01996       if(pf != 0) {
01997         pf->makePresence("MessageServicePresence").release();
01998       }
01999       else {
02000         LOG4CPLUS_ERROR(getApplicationLogger(),
02001                         "Unable to create message service presence ");
02002       }
02003     } 
02004     catch(...) {
02005       LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception");
02006     }
02007   ML::MLlog4cplus::setAppl(this);
02008   }       
02009   catch (xcept::Exception &e) {
02010     reasonForFailedState_ = "enabling FAILED: " + (std::string)e.what();
02011     fsm_.fireFailed(reasonForFailedState_,this);
02012     localLog(reasonForFailedState_);
02013   }
02014   bool retval =  enableCommon();
02015   //  while(evtProcessor_->getState()!= edm::event_processor::sRunning){
02016   //    LOG4CPLUS_INFO(getApplicationLogger(),"waiting for edm::EventProcessor to start before enabling watchdog");
02017   //    ::sleep(1);
02018   //  }
02019   return retval;
02020 }
02021 
02022 bool FUEventProcessor::stopClassic()
02023 {
02024   try {
02025     LOG4CPLUS_INFO(getApplicationLogger(),"Start stopping :) ...");
02026     edm::EventProcessor::StatusCode rc = evtProcessor_.stop();
02027     if(rc == edm::EventProcessor::epSuccess) 
02028       fsm_.fireEvent("StopDone",this);
02029     else
02030       {
02031         //      epMState_ = evtProcessor_->currentStateName();
02032         if(rc == edm::EventProcessor::epTimedOut)
02033           reasonForFailedState_ = "EventProcessor stop timed out";
02034         else
02035           reasonForFailedState_ = "EventProcessor did not receive STOP event";
02036         fsm_.fireFailed(reasonForFailedState_,this);
02037         localLog(reasonForFailedState_);
02038       }
02039     if(hasShMem_) detachDqmFromShm();
02040   }
02041   catch (xcept::Exception &e) {
02042     reasonForFailedState_ = "stopping FAILED: " + (std::string)e.what();
02043     localLog(reasonForFailedState_);
02044     fsm_.fireFailed(reasonForFailedState_,this);
02045   }
02046   LOG4CPLUS_INFO(getApplicationLogger(),"Finished stopping!");
02047   localLog("-I- Stop completed");
02048   return false;
02049 }
02050 
02051 void FUEventProcessor::stopSlavesAndAcknowledge()
02052 {
02053   MsgBuf msg(0,MSQM_MESSAGE_TYPE_STOP);
02054   MsgBuf msg1(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_STOP);
02055 
02056   std::vector<bool> processes_to_stop(nbSubProcesses_.value_,false);
02057   for(unsigned int i = 0; i < subs_.size(); i++)
02058     {
02059       pthread_mutex_lock(&stop_lock_);
02060       if(subs_[i].alive()>0){
02061         processes_to_stop[i] = true;
02062         subs_[i].post(msg,false);
02063       }
02064       pthread_mutex_unlock(&stop_lock_);
02065     }
02066   for(unsigned int i = 0; i < subs_.size(); i++)
02067     {
02068       pthread_mutex_lock(&stop_lock_);
02069       if(processes_to_stop[i]){
02070         try{
02071           subs_[i].rcv(msg1,false);
02072         }
02073         catch(evf::Exception &e){
02074           std::ostringstream ost;
02075           ost << "failed to get STOP - errno ->" << errno << " " << e.what(); 
02076           reasonForFailedState_ = ost.str();
02077           LOG4CPLUS_ERROR(getApplicationLogger(),reasonForFailedState_);
02078           //      fsm_.fireFailed(reasonForFailedState_,this);
02079           localLog(reasonForFailedState_);
02080           pthread_mutex_unlock(&stop_lock_);
02081           continue;
02082         }
02083       }
02084       else {
02085         pthread_mutex_unlock(&stop_lock_);
02086         continue;
02087       }
02088       pthread_mutex_unlock(&stop_lock_);
02089       if(msg1->mtype==MSQS_MESSAGE_TYPE_STOP)
02090         while(subs_[i].alive()>0) ::usleep(10000);
02091       subs_[i].disconnect();
02092     }
02093   //  subs_.clear();
02094 
02095 }
02096 
02097 void FUEventProcessor::microState(xgi::Input *in,xgi::Output *out)
02098 {
02099   std::string urn = getApplicationDescriptor()->getURN();
02100   try{
02101     evtProcessor_.stateNameFromIndex(0);
02102     evtProcessor_.moduleNameFromIndex(0);
02103   if(myProcess_) {std::cout << "microstate called for child! bail out" << std::endl; return;}
02104   *out << "<tr><td>" << fsm_.stateName()->toString() 
02105        << "</td><td>"<< (myProcess_ ? "S" : "M") <<"</td><td>" << nblive_ << "</td><td>"
02106        << nbdead_ << "</td><td><a href=\"/" << urn << "/procStat\">" << getpid() <<"</a></td>";
02107   evtProcessor_.microState(in,out);
02108   *out << "<td></td><td>" << nbTotalDQM_ 
02109        << "</td><td>" << evtProcessor_.getScalersUpdates() << "</td></tr>";
02110   if(nbSubProcesses_.value_!=0 && !myProcess_) 
02111     {
02112       pthread_mutex_lock(&start_lock_);
02113       for(unsigned int i = 0; i < subs_.size(); i++)
02114         {
02115           try{
02116             if(subs_[i].alive()>0)
02117               {
02118                 *out << "<tr><td  bgcolor=\"#00FF00\" id=\"a"
02119                      << i << "\">""Alive</td><td>S</td><td>"
02120                      << subs_[i].queueId() << "<td>" 
02121                      << subs_[i].queueStatus()<< "/"
02122                      << subs_[i].queueOccupancy() << "/"
02123                      << subs_[i].queuePidOfLastSend() << "/"
02124                      << subs_[i].queuePidOfLastReceive() 
02125                      << "</td><td><a id=\"p"<< i << "\" href=\"SubWeb?process=" 
02126                      << subs_[i].pid() << "&method=procStat\">" 
02127                      << subs_[i].pid()<<"</a></td>" //<< msg->mtext;
02128                      << "<td>" << evtProcessor_.stateNameFromIndex(subs_[i].params().Ms) << "</td><td>" 
02129                      << evtProcessor_.moduleNameFromIndex(subs_[i].params().ms) << "</td><td>" 
02130                      << subs_[i].params().nba << "/" << subs_[i].params().nbp 
02131                      << " (" << float(subs_[i].params().nba)/float(subs_[i].params().nbp)*100. <<"%)" 
02132                      << "</td><td>" << subs_[i].params().ls  << "/" << subs_[i].params().ls 
02133                      << "</td><td>" << subs_[i].params().ps 
02134                      << "</td><td" 
02135                      << ((subs_[i].params().eols<subs_[i].params().ls) ? " bgcolor=\"#00FF00\"" : " bgcolor=\"#FF0000\"")  << ">" 
02136                      << subs_[i].params().eols  
02137                      << "</td><td>" << subs_[i].params().dqm 
02138                      << "</td><td>" << subs_[i].params().trp << "</td>";
02139               }
02140             else 
02141               {
02142                 pthread_mutex_lock(&pickup_lock_);
02143                 *out << "<tr><td id=\"a"<< i << "\" ";
02144                 if(subs_[i].alive()==-1000)
02145                   *out << " bgcolor=\"#bbaabb\">NotInitialized";
02146                 else
02147                   *out << (subs_[i].alive()==0 ? ">Done" : " bgcolor=\"#FF0000\">Dead");
02148                 *out << "</td><td>S</td><td>"<< subs_[i].queueId() << "<td>" 
02149                      << subs_[i].queueStatus() << "/"
02150                      << subs_[i].queueOccupancy() << "/"
02151                      << subs_[i].queuePidOfLastSend() << "/"
02152                      << subs_[i].queuePidOfLastReceive() 
02153                      << "</td><td id=\"p"<< i << "\">"
02154                      <<subs_[i].pid()<<"</td><td colspan=\"5\">" << subs_[i].reasonForFailed();
02155                 if(subs_[i].alive()!=0 && subs_[i].alive()!=-1000) 
02156                   {
02157                     if(autoRestartSlaves_ && subs_[i].restartCount()<=2) 
02158                       *out << " will restart in " << subs_[i].countdown() << " s";
02159                     else if(autoRestartSlaves_)
02160                       *out << " reached maximum restart count";
02161                     else *out << " autoRestart is disabled ";
02162                   }
02163                 *out << "</td><td" 
02164                      << ((subs_[i].params().eols<subs_[i].params().ls) ? 
02165                          " bgcolor=\"#00FF00\"" : " bgcolor=\"#FF0000\"")  
02166                      << ">" 
02167                      << subs_[i].params().eols  
02168                      << "</td><td>" << subs_[i].params().dqm 
02169                      << "</td><td>" << subs_[i].params().trp << "</td>";
02170                 pthread_mutex_unlock(&pickup_lock_);
02171               }
02172             *out << "</tr>";
02173           }
02174           catch(evf::Exception &e){
02175             *out << "<tr><td id=\"a"<< i << "\" " 
02176                  <<"bgcolor=\"#FFFF00\">NotResp</td><td>S</td><td>"<< subs_[i].queueId() << "<td>" 
02177                  << subs_[i].queueStatus() << "/"
02178                  << subs_[i].queueOccupancy() << "/"
02179                  << subs_[i].queuePidOfLastSend() << "/"
02180                  << subs_[i].queuePidOfLastReceive() 
02181                  << "</td><td id=\"p"<< i << "\">"
02182                  <<subs_[i].pid()<<"</td>";
02183           }
02184         }
02185       pthread_mutex_unlock(&start_lock_); 
02186     }
02187   }
02188       catch(evf::Exception &e)
02189       {
02190         LOG4CPLUS_INFO(getApplicationLogger(),"evf::Exception caught in microstate - " << e.what());    
02191       }
02192     catch(cms::Exception &e)
02193       {
02194         LOG4CPLUS_INFO(getApplicationLogger(),"cms::Exception caught in microstate - " << e.what());    
02195       }
02196     catch(std::exception &e)
02197       {
02198         LOG4CPLUS_INFO(getApplicationLogger(),"std::Exception caught in microstate - " << e.what());    
02199       }
02200     catch(...)
02201       {
02202         LOG4CPLUS_INFO(getApplicationLogger(),"unknown exception caught in microstate - ");    
02203       }
02204 
02205 }
02206 
02207 
02208 void FUEventProcessor::updater(xgi::Input *in,xgi::Output *out)
02209 {
02210   using namespace utils;
02211 
02212   *out << updaterStatic_;
02213   mDiv(out,"loads");
02214   uptime(out);
02215   cDiv(out);
02216   mDiv(out,"st",fsm_.stateName()->toString());
02217   mDiv(out,"ru",runNumber_.toString());
02218   mDiv(out,"nsl",nbSubProcesses_.value_);
02219   mDiv(out,"nsr",nbSubProcessesReporting_.value_);
02220   mDiv(out,"cl");
02221   *out << getApplicationDescriptor()->getClassName() 
02222        << (nbSubProcesses_.value_ > 0 ? "MP " : " ");
02223   cDiv(out);
02224   mDiv(out,"in",getApplicationDescriptor()->getInstance());
02225   if(fsm_.stateName()->toString() != "Halted" && fsm_.stateName()->toString() != "halting"){
02226     mDiv(out,"hlt");
02227     *out << "<a href=\"" << configString_.toString() << "\">HLT Config</a>";
02228     cDiv(out);
02229     *out << std::endl;
02230   }
02231   else
02232     mDiv(out,"hlt","Not yet...");
02233 
02234   mDiv(out,"sq",squidPresent_.toString());
02235   mDiv(out,"vwl",(supervising_ ? "Active" : "Not Initialized"));
02236   mDiv(out,"mwl",evtProcessor_.wlMonitoring());
02237   if(nbProcessed != 0 && nbAccepted != 0)
02238     {
02239       mDiv(out,"tt",((xdata::UnsignedInteger32*)nbProcessed)->value_);
02240       mDiv(out,"ac",((xdata::UnsignedInteger32*)nbAccepted)->value_);
02241     }
02242   else
02243     {
02244       mDiv(out,"tt",0);
02245       mDiv(out,"ac",0);
02246     }
02247   if(!myProcess_)
02248     mDiv(out,"swl",(wlSummarizeActive_ ? "Active" : "Inactive"));
02249   else
02250     mDiv(out,"swl",(wlScalersActive_ ? "Active" : "Inactive"));
02251 
02252   mDiv(out,"idi",iDieUrl_.value_);
02253   if(vp_!=0){
02254     mDiv(out,"vpi",(unsigned int) vp_);
02255     if(vulture_->hasStarted()>=0)
02256       mDiv(out,"vul","Prowling");
02257     else
02258       mDiv(out,"vul","Dead");
02259   }
02260   else{
02261     mDiv(out,"vul",(vulture_==0 ? "Nope" : "Hatching"));
02262   }    
02263   if(evtProcessor_){
02264     mDiv(out,"ll");
02265     *out << evtProcessor_.lastLumi().ls
02266          << "," << evtProcessor_.lastLumi().proc << "," << evtProcessor_.lastLumi().acc;
02267     cDiv(out);
02268   }
02269   mDiv(out,"lg");
02270   for(unsigned int i = logRingIndex_; i<logRingSize_; i++)
02271     *out << logRing_[i] << std::endl;
02272   if(logWrap_)
02273     for(unsigned int i = 0; i<logRingIndex_; i++)
02274       *out << logRing_[i] << std::endl;
02275   cDiv(out);
02276   mDiv(out,"cha");
02277   if(cpustat_) *out << cpustat_->getChart();
02278   cDiv(out);
02279 }
02280 
02281 void FUEventProcessor::procStat(xgi::Input *in, xgi::Output *out)
02282 {
02283   evf::utils::procStat(out);
02284 }
02285 
02286 void FUEventProcessor::sendMessageOverMonitorQueue(MsgBuf &buf)
02287 {
02288   if(myProcess_) myProcess_->postSlave(buf,true);
02289 }
02290 
02291 void FUEventProcessor::makeStaticInfo()
02292 {
02293   using namespace utils;
02294   std::ostringstream ost;
02295   mDiv(&ost,"ve");
02296   ost<< "$Revision: 1.134.2.6 $ (" << edm::getReleaseVersion() <<")";
02297   cDiv(&ost);
02298   mDiv(&ost,"ou",outPut_.toString());
02299   mDiv(&ost,"sh",hasShMem_.toString());
02300   mDiv(&ost,"mw",hasModuleWebRegistry_.toString());
02301   mDiv(&ost,"sw",hasServiceWebRegistry_.toString());
02302   
02303   xdata::Serializable *monsleep = 0;
02304   xdata::Serializable *lstimeout = 0;
02305   try{
02306     monsleep  = applicationInfoSpace_->find("monSleepSec");
02307     lstimeout = applicationInfoSpace_->find("lsTimeOut");
02308   }
02309   catch(xdata::exception::Exception e){
02310   }
02311   
02312   if(monsleep!=0)
02313     mDiv(&ost,"ms",monsleep->toString());
02314   if(lstimeout!=0)
02315     mDiv(&ost,"lst",lstimeout->toString());
02316   char cbuf[sizeof(struct utsname)];
02317   struct utsname* buf = (struct utsname*)cbuf;
02318   uname(buf);
02319   mDiv(&ost,"sysinfo");
02320   ost << buf->sysname << " " << buf->nodename 
02321       << " " << buf->release << " " << buf->version << " " << buf->machine;
02322   cDiv(&ost);
02323   updaterStatic_ = ost.str();
02324 }
02325 
02326 XDAQ_INSTANTIATOR_IMPL(evf::FUEventProcessor)