CMS 3D CMS Logo

Public Member Functions | Static Public Member Functions | Private Member Functions | Private Attributes | Static Private Attributes

evf::FUEventProcessor Class Reference

#include <FUEventProcessor.h>

List of all members.

Public Member Functions

void actionPerformed (xdata::Event &e)
bool configuring (toolbox::task::WorkLoop *wl)
void css (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception)
void defaultWebPage (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception)
bool enabling (toolbox::task::WorkLoop *wl)
xoap::MessageReference fsmCallback (xoap::MessageReference msg) throw (xoap::exception::Exception)
 FUEventProcessor (xdaq::ApplicationStub *s)
bool halting (toolbox::task::WorkLoop *wl)
void microState (xgi::Input *in, xgi::Output *out)
void moduleWeb (xgi::Input *in, xgi::Output *out)
void pathNames (xgi::Input *, xgi::Output *) throw (xgi::exception::Exception)
void procStat (xgi::Input *in, xgi::Output *out)
void scalersWeb (xgi::Input *, xgi::Output *) throw (xgi::exception::Exception)
void sendMessageOverMonitorQueue (MsgBuf &)
void serviceWeb (xgi::Input *in, xgi::Output *out)
void spotlightWebPage (xgi::Input *, xgi::Output *) throw (xgi::exception::Exception)
bool stopping (toolbox::task::WorkLoop *wl)
void subWeb (xgi::Input *in, xgi::Output *out)
void updater (xgi::Input *in, xgi::Output *out)
 XDAQ_INSTANTIATOR ()
virtual ~FUEventProcessor ()

Static Public Member Functions

static void forkProcessFromEDM_helper (void *addr)

Private Member Functions

void attachDqmToShm () throw (evf::Exception)
void detachDqmFromShm () throw (evf::Exception)
bool doEndRunInEDM ()
bool enableClassic ()
bool enableCommon ()
bool enableForkInEDM ()
bool enableMPEPSlave ()
void forkProcessesFromEDM ()
void localLog (std::string)
std::string logsAsString ()
void makeStaticInfo ()
bool receiving (toolbox::task::WorkLoop *wl)
bool receivingAndMonitor (toolbox::task::WorkLoop *wl)
bool restartForkInEDM (unsigned int slotId)
bool scalers (toolbox::task::WorkLoop *wl)
void setAttachDqmToShm () throw (evf::Exception)
void startReceivingLoop ()
void startReceivingMonitorLoop ()
void startScalersWorkLoop () throw (evf::Exception)
void startSummarizeWorkLoop () throw (evf::Exception)
void startSupervisorLoop ()
bool stopClassic ()
void stopSlavesAndAcknowledge ()
bool summarize (toolbox::task::WorkLoop *wl)
bool supervisor (toolbox::task::WorkLoop *wl)

Private Attributes

int anonymousPipe_ [2]
xdata::InfoSpace * applicationInfoSpace_
toolbox::task::ActionSignature * asReceiveMsgAndExecute_
toolbox::task::ActionSignature * asReceiveMsgAndRead_
toolbox::task::ActionSignature * asScalers_
toolbox::task::ActionSignature * asSummarize_
toolbox::task::ActionSignature * asSupervisor_
xdata::Boolean autoRestartSlaves_
xdata::String class_
xdata::String configString_
std::string configuration_
CPUStatcpustat_
Css css_
bool edm_init_done_
xdata::Boolean epInitialized_
FWEPWrapper evtProcessor_
xdata::Boolean exitOnError_
xdata::UnsignedInteger32 forkInEDM_
moduleweb::ForkInfoObjforkInfoObj_
pthread_mutex_t forkObjLock_
evf::StateMachine fsm_
xdata::Boolean hasModuleWebRegistry_
xdata::Boolean hasPrescaleService_
xdata::Boolean hasServiceWebRegistry_
xdata::Boolean hasShMem_
xdata::Boolean iDieStatisticsGathering_
xdata::String iDieUrl_
xdata::UnsignedInteger32 instance_
xdata::Boolean isRunNumberSetter_
Logger log_
std::vector< std::string > logRing_
unsigned int logRingIndex_
bool logWrap_
MsgBuf master_message_prg_
MsgBuf master_message_prr_
MsgBuf master_message_trr_
xdata::InfoSpace * monitorInfoSpace_
xdata::InfoSpace * monitorLegendaInfoSpace_
ModuleWebRegistrymwrRef_
SubProcessmyProcess_
std::list< std::string > names_
xdata::Serializable * nbAccepted
unsigned int nbdead_
unsigned int nblive_
xdata::Serializable * nbProcessed
xdata::UnsignedInteger32 nbSubProcesses_
xdata::UnsignedInteger32 nbSubProcessesReporting_
unsigned int nbTotalDQM_
bool outprev_
xdata::Boolean outPut_
pthread_mutex_t pickup_lock_
RateStatratestat_
std::string reasonForFailedState_
bool receiving_
bool receivingM_
xdata::UnsignedInteger32 runNumber_
xdata::InfoSpace * scalersInfoSpace_
xdata::InfoSpace * scalersLegendaInfoSpace_
unsigned int scalersUpdates_
MsgBuf slave_message_monitoring_
MsgBuf slave_message_prr_
xdata::UnsignedInteger32 slaveRestartDelaySecs_
ShmOutputModuleRegistrysorRef_
std::string sourceId_
xdata::Vector< xdata::Integer > spMStates_
xdata::Vector< xdata::Integer > spmStates_
SquidNet squidnet_
xdata::Boolean squidPresent_
pthread_mutex_t start_lock_
pthread_mutex_t stop_lock_
std::vector< SubProcesssubs_
xdata::UnsignedInteger32 superSleepSec_
bool supervising_
std::string updaterStatic_
xdata::String url_
pid_t vp_
Vulturevulture_
toolbox::task::WorkLoop * wlReceiving_
toolbox::task::WorkLoop * wlReceivingMonitor_
toolbox::task::WorkLoop * wlScalers_
bool wlScalersActive_
toolbox::task::WorkLoop * wlSummarize_
bool wlSummarizeActive_
toolbox::task::WorkLoop * wlSupervising_

Static Private Attributes

static const unsigned int logRingSize_ = 50

Detailed Description

Definition at line 61 of file FUEventProcessor.h.


Constructor & Destructor Documentation

FUEventProcessor::FUEventProcessor ( xdaq::ApplicationStub *  s)

Definition at line 63 of file FUEventProcessor.cc.

References anonymousPipe_, applicationInfoSpace_, autoRestartSlaves_, evf::SquidNet::check(), class_, configString_, css(), defaultWebPage(), epInitialized_, evtProcessor_, evf::StateMachine::findRcmsStateListener(), forkInEDM_, forkInfoObj_, forkObjLock_, evf::StateMachine::foundRcmsStateListener(), fsm_, reco::get(), hasModuleWebRegistry_, hasPrescaleService_, hasServiceWebRegistry_, hasShMem_, iDieStatisticsGathering_, iDieUrl_, evf::StateMachine::initialize(), instance_, isRunNumberSetter_, edm::PresenceFactory::makePresence(), makeStaticInfo(), microState(), moduleWeb(), monitorInfoSpace_, monitorLegendaInfoSpace_, names_, nbSubProcesses_, nbSubProcessesReporting_, outPut_, pathNames(), pickup_lock_, testRegParams::pipe, procStat(), evf::FWEPWrapper::publishConfigAndMonitorItems(), evf::StateMachine::rcmsStateListener(), runNumber_, scalersInfoSpace_, scalersLegendaInfoSpace_, scalersWeb(), serviceWeb(), evf::FWEPWrapper::setAppCtxt(), evf::FWEPWrapper::setAppDesc(), ML::MLlog4cplus::setAppl(), evf::FWEPWrapper::setApplicationInfoSpace(), evf::FWEPWrapper::setMonitorInfoSpace(), evf::FWEPWrapper::setRcms(), evf::FWEPWrapper::setScalersInfoSpace(), slaveRestartDelaySecs_, sourceId_, spMStates_, spmStates_, spotlightWebPage(), squidnet_, squidPresent_, start_lock_, startSupervisorLoop(), evf::StateMachine::stateName(), stop_lock_, subWeb(), superSleepSec_, updater(), url_, and vulture_.

  : xdaq::Application(s)
  , fsm_(this)
  , log_(getApplicationLogger())
  , evtProcessor_(log_, getApplicationDescriptor()->getInstance())
  , runNumber_(0)
  , epInitialized_(false)
  , outPut_(true)
  , autoRestartSlaves_(false)
  , slaveRestartDelaySecs_(10)
  , hasShMem_(true)
  , hasPrescaleService_(true)
  , hasModuleWebRegistry_(true)
  , hasServiceWebRegistry_(true)
  , isRunNumberSetter_(true)
  , iDieStatisticsGathering_(false)
  , outprev_(true)
  , exitOnError_(true)
  , reasonForFailedState_()
  , squidnet_(3128,"https://localhost:8000/RELEASE-NOTES.txt")
  , logRing_(logRingSize_)
  , logRingIndex_(logRingSize_)
  , logWrap_(false)
  , nbSubProcesses_(0)
  , nbSubProcessesReporting_(0)
  , forkInEDM_(true)
  , nblive_(0)
  , nbdead_(0)
  , nbTotalDQM_(0)
  , wlReceiving_(0)
  , asReceiveMsgAndExecute_(0)
  , receiving_(false) 
  , wlReceivingMonitor_(0)
  , asReceiveMsgAndRead_(0)
  , receivingM_(false)
  , myProcess_(0)
  , wlSupervising_(0)
  , asSupervisor_(0)
  , supervising_(false)
  , monitorInfoSpace_(0)
  , monitorLegendaInfoSpace_(0)
  , applicationInfoSpace_(0)
  , nbProcessed(0)
  , nbAccepted(0)
  , scalersInfoSpace_(0)
  , scalersLegendaInfoSpace_(0)
  , wlScalers_(0)
  , asScalers_(0)
  , wlScalersActive_(false)
  , scalersUpdates_(0)
  , wlSummarize_(0)
  , asSummarize_(0)
  , wlSummarizeActive_(false)
  , superSleepSec_(1)
  , iDieUrl_("none")
  , vulture_(0)
  , vp_(0)
  , cpustat_(0)
  , ratestat_(0)
  , mwrRef_(nullptr)
  , sorRef_(nullptr)
  , master_message_prg_(0,MSQM_MESSAGE_TYPE_PRG)
  , master_message_prr_(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_PRR)
  , slave_message_prr_(sizeof(prg),MSQS_MESSAGE_TYPE_PRR)
  , master_message_trr_(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_TRR)
  , edm_init_done_(true)
{
  using namespace utils;

  names_.push_back("nbProcessed"    );
  names_.push_back("nbAccepted"     );
  names_.push_back("epMacroStateInt");
  names_.push_back("epMicroStateInt");
  // create pipe for web communication
  int retpipe = pipe(anonymousPipe_);
  if(retpipe != 0)
        LOG4CPLUS_ERROR(getApplicationLogger(),"Failed to create pipe");
  // check squid presence
  squidPresent_ = squidnet_.check();
  //pass application parameters to FWEPWrapper
  evtProcessor_.setAppDesc(getApplicationDescriptor());
  evtProcessor_.setAppCtxt(getApplicationContext());
  // bind relevant callbacks to finite state machine
  fsm_.initialize<evf::FUEventProcessor>(this);
  
  //set sourceId_
  url_ =
    getApplicationDescriptor()->getContextDescriptor()->getURL()+"/"+
    getApplicationDescriptor()->getURN();
  class_   =getApplicationDescriptor()->getClassName();
  instance_=getApplicationDescriptor()->getInstance();
  sourceId_=class_.toString()+"_"+instance_.toString();
  LOG4CPLUS_INFO(getApplicationLogger(),sourceId_ <<" constructor"         );
  LOG4CPLUS_INFO(getApplicationLogger(),"CMSSW_BASE:"<<getenv("CMSSW_BASE"));
  
  getApplicationDescriptor()->setAttribute("icon", "/evf/images/epicon.jpg");
  
  xdata::InfoSpace *ispace = getApplicationInfoSpace();
  applicationInfoSpace_ = ispace;

  // default configuration
  ispace->fireItemAvailable("parameterSet",         &configString_                );
  ispace->fireItemAvailable("epInitialized",        &epInitialized_               );
  ispace->fireItemAvailable("stateName",             fsm_.stateName()             );
  ispace->fireItemAvailable("runNumber",            &runNumber_                   );
  ispace->fireItemAvailable("outputEnabled",        &outPut_                      );

  ispace->fireItemAvailable("hasSharedMemory",      &hasShMem_);
  ispace->fireItemAvailable("hasPrescaleService",   &hasPrescaleService_          );
  ispace->fireItemAvailable("hasModuleWebRegistry", &hasModuleWebRegistry_        );
  ispace->fireItemAvailable("hasServiceWebRegistry", &hasServiceWebRegistry_      );
  ispace->fireItemAvailable("isRunNumberSetter",    &isRunNumberSetter_           );
  ispace->fireItemAvailable("iDieStatisticsGathering",   &iDieStatisticsGathering_);
  ispace->fireItemAvailable("rcmsStateListener",     fsm_.rcmsStateListener()     );
  ispace->fireItemAvailable("foundRcmsStateListener",fsm_.foundRcmsStateListener());
  ispace->fireItemAvailable("nbSubProcesses",       &nbSubProcesses_              );
  ispace->fireItemAvailable("nbSubProcessesReporting",&nbSubProcessesReporting_   );
  ispace->fireItemAvailable("forkInEDM"             ,&forkInEDM_                  );
  ispace->fireItemAvailable("superSleepSec",        &superSleepSec_               );
  ispace->fireItemAvailable("autoRestartSlaves",    &autoRestartSlaves_           );
  ispace->fireItemAvailable("slaveRestartDelaySecs",&slaveRestartDelaySecs_       );
  ispace->fireItemAvailable("iDieUrl",              &iDieUrl_                     );
  
  // Add infospace listeners for exporting data values
  getApplicationInfoSpace()->addItemChangedListener("parameterSet",        this);
  getApplicationInfoSpace()->addItemChangedListener("outputEnabled",       this);

  // findRcmsStateListener
  fsm_.findRcmsStateListener();
  
  // initialize monitoring infospace

  std::string monInfoSpaceName="evf-eventprocessor-status-monitor";
  toolbox::net::URN urn = this->createQualifiedInfoSpace(monInfoSpaceName);
  monitorInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());

  std::string monLegendaInfoSpaceName="evf-eventprocessor-status-legenda";
  urn = this->createQualifiedInfoSpace(monLegendaInfoSpaceName);
  monitorLegendaInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());

  
  monitorInfoSpace_->fireItemAvailable("url",                      &url_            );
  monitorInfoSpace_->fireItemAvailable("class",                    &class_          );
  monitorInfoSpace_->fireItemAvailable("instance",                 &instance_       );
  monitorInfoSpace_->fireItemAvailable("runNumber",                &runNumber_      );
  monitorInfoSpace_->fireItemAvailable("stateName",                 fsm_.stateName()); 

  monitorInfoSpace_->fireItemAvailable("squidPresent",             &squidPresent_   );

  std::string scalersInfoSpaceName="evf-eventprocessor-scalers-monitor";
  urn = this->createQualifiedInfoSpace(scalersInfoSpaceName);
  scalersInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());

  std::string scalersLegendaInfoSpaceName="evf-eventprocessor-scalers-legenda";
  urn = this->createQualifiedInfoSpace(scalersLegendaInfoSpaceName);
  scalersLegendaInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());



  evtProcessor_.setScalersInfoSpace(scalersInfoSpace_,scalersLegendaInfoSpace_);
  scalersInfoSpace_->fireItemAvailable("instance", &instance_);

  evtProcessor_.setApplicationInfoSpace(ispace);
  evtProcessor_.setMonitorInfoSpace(monitorInfoSpace_,monitorLegendaInfoSpace_);
  evtProcessor_.publishConfigAndMonitorItems(nbSubProcesses_.value_!=0);

  //subprocess state vectors for MP
  monitorInfoSpace_->fireItemAvailable("epMacroStateInt",             &spMStates_); 
  monitorInfoSpace_->fireItemAvailable("epMicroStateInt",             &spmStates_); 
  
  // Bind web interface
  xgi::bind(this, &FUEventProcessor::css,              "styles.css");
  xgi::bind(this, &FUEventProcessor::defaultWebPage,   "Default"   );
  xgi::bind(this, &FUEventProcessor::spotlightWebPage, "Spotlight" );
  xgi::bind(this, &FUEventProcessor::scalersWeb,       "scalersWeb");
  xgi::bind(this, &FUEventProcessor::pathNames,        "pathNames" );
  xgi::bind(this, &FUEventProcessor::subWeb,           "SubWeb"    );
  xgi::bind(this, &FUEventProcessor::moduleWeb,        "moduleWeb" );
  xgi::bind(this, &FUEventProcessor::serviceWeb,       "serviceWeb");
  xgi::bind(this, &FUEventProcessor::microState,       "microState");
  xgi::bind(this, &FUEventProcessor::updater,          "updater"   );
  xgi::bind(this, &FUEventProcessor::procStat,         "procStat"  );

  // instantiate the plugin manager, not referenced here after!

  edm::AssertHandler ah;

  try{
    LOG4CPLUS_DEBUG(getApplicationLogger(),
                    "Trying to create message service presence ");
    edm::PresenceFactory *pf = edm::PresenceFactory::get();
    if(pf != 0) {
      pf->makePresence("MessageServicePresence").release();
    }
    else {
      LOG4CPLUS_ERROR(getApplicationLogger(),
                      "Unable to create message service presence ");
    }
  } 
  catch(...) {
    LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception");
  }
  ML::MLlog4cplus::setAppl(this);      

  typedef std::set<xdaq::ApplicationDescriptor*> AppDescSet_t;
  typedef AppDescSet_t::iterator                 AppDescIter_t;
  
  AppDescSet_t rcms=
    getApplicationContext()->getDefaultZone()->
    getApplicationDescriptors("RCMSStateListener");
  if(rcms.size()==0) 
    {
      LOG4CPLUS_WARN(getApplicationLogger(),
                       "MonitorReceiver not found, perhaphs it has not been defined ? Scalers updater wl will bail out!");
      //        localLog("-W- MonitorReceiver not found, perhaphs it has not been defined ? Scalers updater wl will bail out!");
    }
  else
    {
      AppDescIter_t it = rcms.begin();
      evtProcessor_.setRcms(*it);
    }
  pthread_mutex_init(&start_lock_,0);
  pthread_mutex_init(&stop_lock_,0);
  pthread_mutex_init(&pickup_lock_,0);

  forkInfoObj_=nullptr;
  pthread_mutex_init(&forkObjLock_,0);
  makeStaticInfo();
  startSupervisorLoop();  

  if(vulture_==0) vulture_ = new Vulture(true);


  AppDescSet_t setOfiDie=
    getApplicationContext()->getDefaultZone()->
    getApplicationDescriptors("evf::iDie");
  
  for (AppDescIter_t it=setOfiDie.begin();it!=setOfiDie.end();++it)
    if ((*it)->getInstance()==0) // there has to be only one instance of iDie
      iDieUrl_ = (*it)->getContextDescriptor()->getURL() + "/" + (*it)->getURN();
}
FUEventProcessor::~FUEventProcessor ( ) [virtual]

Definition at line 309 of file FUEventProcessor.cc.

References vulture_.

{
  // no longer needed since the wrapper is a member of the class and one can rely on 
  // implicit destructor - to be revised - at any rate the most common exit path is via "kill"...
  //  if (evtProcessor_) delete evtProcessor_;
  if(vulture_ != 0) delete vulture_;
}

Member Function Documentation

void FUEventProcessor::actionPerformed ( xdata::Event &  e)

Definition at line 679 of file FUEventProcessor.cc.

References alignCSCRings::e, epInitialized_, evtProcessor_, fsm_, outprev_, outPut_, and evf::StateMachine::stateName().

{

  if (e.type()=="ItemChangedEvent" && fsm_.stateName()->toString()!="Halted") {
    std::string item = dynamic_cast<xdata::ItemChangedEvent&>(e).itemName();
    
    if ( item == "parameterSet") {
      LOG4CPLUS_WARN(getApplicationLogger(),
                     "HLT Menu changed, force reinitialization of EventProcessor");
      epInitialized_ = false;
    }
    if ( item == "outputEnabled") {
      if(outprev_ != outPut_) {
        LOG4CPLUS_WARN(getApplicationLogger(),
                       (outprev_ ? "Disabling " : "Enabling ")<<"global output");
        evtProcessor_->enableEndPaths(outPut_);
        outprev_ = outPut_;
      }
    }
    if (item == "globalInputPrescale") {
      LOG4CPLUS_WARN(getApplicationLogger(),
                     "Setting global input prescale has no effect "
                     <<"in this version of the code");
    }
    if ( item == "globalOutputPrescale") {
      LOG4CPLUS_WARN(getApplicationLogger(),
                     "Setting global output prescale has no effect "
                     <<"in this version of the code");
    }
  }
  
}
void FUEventProcessor::attachDqmToShm ( ) throw (evf::Exception) [private]

Definition at line 924 of file FUEventProcessor.cc.

References alignCSCRings::e, evtProcessor_, edm::Service< T >::isAvailable(), summarizeEdmComparisonLogfiles::success, and cms::Exception::what().

Referenced by enableCommon(), and forkProcessesFromEDM().

{
  std::string errmsg;
  bool success = false;
  try {
    edm::ServiceRegistry::Operate operate(evtProcessor_->getToken());
    if(edm::Service<FUShmDQMOutputService>().isAvailable())
      success = edm::Service<FUShmDQMOutputService>()->attachToShm();
    if (!success) errmsg = "Failed to attach DQM service to shared memory";
  }
  catch (cms::Exception& e) {
    errmsg = "Failed to attach DQM service to shared memory: " + (std::string)e.what();
  }
  if (!errmsg.empty()) XCEPT_RAISE(evf::Exception,errmsg);
}
bool FUEventProcessor::configuring ( toolbox::task::WorkLoop *  wl)

Definition at line 324 of file FUEventProcessor.cc.

References configString_, evf::FWEPWrapper::configuration(), configuration_, cpustat_, alignCSCRings::e, epInitialized_, evtProcessor_, exception, Exception, cms::Exception::explainSelf(), evf::StateMachine::fireEvent(), evf::StateMachine::fireFailed(), fsm_, evf::FWEPWrapper::getmicromap(), evf::FWEPWrapper::getModuleWebRegistry(), evf::FWEPWrapper::getNumberOfMicrostates(), evf::FWEPWrapper::getShmOutputModuleRegistry(), hasModuleWebRegistry_, hasPrescaleService_, hasServiceWebRegistry_, i, iDieStatisticsGathering_, iDieUrl_, evf::FWEPWrapper::init(), instance_, localLog(), evf::Vulture::makeProcess(), mwrRef_, nbSubProcesses_, ratestat_, reasonForFailedState_, scalersLegendaInfoSpace_, evf::CPUStat::sendLegenda(), evf::RateStat::sendLegenda(), edm::event_processor::sInit, sorRef_, spMStates_, spmStates_, evf::FWEPWrapper::startMonitoringWorkLoop(), vp_, and vulture_.

{
//   std::cout << "values " << ((nbSubProcesses_.value_!=0) ? 0x10 : 0) << " "
//          << ((instance_.value_==0) ? 0x8 : 0) << " "
//          << (hasServiceWebRegistry_.value_ ? 0x4 : 0) << " "
//          << (hasModuleWebRegistry_.value_ ? 0x2 : 0) << " "
//          << (hasPrescaleService_.value_ ? 0x1 : 0) <<std::endl;
  unsigned short smap 
    = ((nbSubProcesses_.value_!=0) ? 0x10 : 0)
    + (((instance_.value_%80)==0) ? 0x8 : 0) // have at least one legend per slice
    + (hasServiceWebRegistry_.value_ ? 0x4 : 0) 
    + (hasModuleWebRegistry_.value_ ? 0x2 : 0) 
    + (hasPrescaleService_.value_ ? 0x1 : 0);
  if(nbSubProcesses_.value_==0) 
    {
      spMStates_.setSize(1); 
      spmStates_.setSize(1); 
    }
  else
    {
      spMStates_.setSize(nbSubProcesses_.value_);
      spmStates_.setSize(nbSubProcesses_.value_);
      for(unsigned int i = 0; i < spMStates_.size(); i++)
        {
          spMStates_[i] = edm::event_processor::sInit; 
          spmStates_[i] = 0; 
        }
    }
  try {
    LOG4CPLUS_INFO(getApplicationLogger(),"Start configuring ...");
    std::string cfg = configString_.toString(); evtProcessor_.init(smap,cfg);
    epInitialized_=true;
    if(evtProcessor_)
      {
        //get ref of mwr
        mwrRef_ = evtProcessor_.getModuleWebRegistry();
        sorRef_ = evtProcessor_.getShmOutputModuleRegistry();
        // moved to wrapper class
        configuration_ = evtProcessor_.configuration();
        if(nbSubProcesses_.value_==0) evtProcessor_.startMonitoringWorkLoop(); 
        evtProcessor_->beginJob(); 
        if(cpustat_) {delete cpustat_; cpustat_=0;}
        cpustat_ = new CPUStat(evtProcessor_.getNumberOfMicrostates(),
                               nbSubProcesses_.value_,
                               instance_.value_,
                               iDieUrl_.value_);
        if(ratestat_) {delete ratestat_; ratestat_=0;}
        ratestat_ = new RateStat(iDieUrl_.value_);
        if(iDieStatisticsGathering_.value_){
          try{
            cpustat_->sendLegenda(evtProcessor_.getmicromap());
            xdata::Serializable *legenda = scalersLegendaInfoSpace_->find("scalersLegenda");
            if(legenda !=0){
              std::string slegenda = ((xdata::String*)legenda)->value_;
              ratestat_->sendLegenda(slegenda);
            }

          }
          catch(evf::Exception &e){
            LOG4CPLUS_INFO(getApplicationLogger(),"coud not send legenda"
                           << e.what());
          }
        }
        
        fsm_.fireEvent("ConfigureDone",this);
        LOG4CPLUS_INFO(getApplicationLogger(),"Finished configuring!");
        localLog("-I- Configuration completed");

      }
  }
  catch (xcept::Exception &e) {
    reasonForFailedState_ = "configuring FAILED: " + (std::string)e.what();
    fsm_.fireFailed(reasonForFailedState_,this);
    localLog(reasonForFailedState_);
  }
  catch(cms::Exception &e) {
    reasonForFailedState_ = e.explainSelf();
    fsm_.fireFailed(reasonForFailedState_,this);
    localLog(reasonForFailedState_);
  }    
  catch(std::exception &e) {
    reasonForFailedState_ = e.what();
    fsm_.fireFailed(reasonForFailedState_,this);
    localLog(reasonForFailedState_);
  }
  catch(...) {
    fsm_.fireFailed("Unknown Exception",this);
  }

  if(vulture_!=0 && vp_ == 0) vp_ = vulture_->makeProcess();

  return false;
}
void evf::FUEventProcessor::css ( xgi::Input *  in,
xgi::Output *  out 
) throw (xgi::exception::Exception) [inline]

Definition at line 104 of file FUEventProcessor.h.

References evf::Css::css(), and css_.

Referenced by FUEventProcessor().

    {
      css_.css(in,out);
    }
void FUEventProcessor::defaultWebPage ( xgi::Input *  in,
xgi::Output *  out 
) throw (xgi::exception::Exception)

Definition at line 781 of file FUEventProcessor.cc.

References nbSubProcesses_, and dbtoconf::out.

Referenced by FUEventProcessor(), and receivingAndMonitor().

{


  *out << "<!DOCTYPE HTML PUBLIC \"-//W3C//DTD HTML 4.0 Transitional//EN\">" 
       << "<html><head><title>" << getApplicationDescriptor()->getClassName() << (nbSubProcesses_.value_ > 0 ? "MP " : " ") 
       << getApplicationDescriptor()->getInstance() << "</title>"
       << "<meta https-equiv=\"REFRESH\" content=\"0;url=/evf/html/defaultBasePage.html\">"
       << "</head></html>";
}
void FUEventProcessor::detachDqmFromShm ( ) throw (evf::Exception) [private]

Definition at line 942 of file FUEventProcessor.cc.

References alignCSCRings::e, evtProcessor_, edm::Service< T >::isAvailable(), summarizeEdmComparisonLogfiles::success, and cms::Exception::what().

Referenced by stopClassic().

{
  std::string errmsg;
  bool success = false;
  try {
    edm::ServiceRegistry::Operate operate(evtProcessor_->getToken());
    if(edm::Service<FUShmDQMOutputService>().isAvailable())
      success = edm::Service<FUShmDQMOutputService>()->detachFromShm();
    if (!success) errmsg = "Failed to detach DQM service from shared memory";
  }
  catch (cms::Exception& e) {
    errmsg = "Failed to detach DQM service from shared memory: " + (std::string)e.what();
  }
  if (!errmsg.empty()) XCEPT_RAISE(evf::Exception,errmsg);
}
bool FUEventProcessor::doEndRunInEDM ( ) [private]

Definition at line 574 of file FUEventProcessor.cc.

References evf::moduleweb::ForkInfoObj::control_sem_, prof2calltree::count, edm_init_done_, evtProcessor_, forkInfoObj_, evf::moduleweb::ForkInfoObj::lock(), log_, edm::event_processor::sJobReady, stor::utils::sleep(), edm::event_processor::sRunning, edm::event_processor::sStopping, evf::moduleweb::ForkInfoObj::stopCondition, and evf::moduleweb::ForkInfoObj::unlock().

Referenced by halting(), and stopping().

                                     {
  //taking care that EP in master is in stoppable state
  if (forkInfoObj_) {

    int count = 30;
    while (!edm_init_done_ && count) {
      ::sleep(1);
      if (count%5==0)
        LOG4CPLUS_WARN(log_,"MASTER EP: Stopping while EP busy in beginRun. waiting " <<count<< "sec");
      count--;
    }
    //if (count==0) fsm_.fireFailed("failed to stop Master EP",this);

    if (evtProcessor_->getState()==edm::event_processor::sJobReady)
      return true;//we are already done

    forkInfoObj_->lock();
    forkInfoObj_->stopCondition=true;
    sem_post(forkInfoObj_->control_sem_);
    forkInfoObj_->unlock();

    count = 30;

    edm::event_processor::State st;
    while(count--) {
      st = evtProcessor_->getState();
      if (st==edm::event_processor::sRunning) ::usleep(100000);
      else if (st==edm::event_processor::sStopping || st==edm::event_processor::sJobReady) {
        break;
      }
      else {
        LOG4CPLUS_ERROR(getApplicationLogger(),
            "Master edm::EventProcessor is in state "<< evtProcessor_->stateName(st) << " while stopping");
        return false;
      }
    }
    if (count<0) {
      LOG4CPLUS_ERROR(getApplicationLogger(),
          "Timeout waiting for Master edm::EventProcessor to go stopping state:"<<evtProcessor_->stateName(st));
      return false;
    }
  }
  return true;
}
bool FUEventProcessor::enableClassic ( ) [private]

Definition at line 1964 of file FUEventProcessor.cc.

References enableCommon(), evtProcessor_, localLog(), stor::utils::sleep(), and edm::event_processor::sRunning.

Referenced by enabling().

{
  bool retval = enableCommon();
  while(evtProcessor_->getState()!= edm::event_processor::sRunning){
    LOG4CPLUS_INFO(getApplicationLogger(),"waiting for edm::EventProcessor to start before enabling watchdog");
    ::sleep(1);
  }
  
  //  implementation moved to EPWrapper
  //  startScalersWorkLoop(); // this is now not done any longer 
  localLog("-I- Start completed");
  return retval;
}
bool FUEventProcessor::enableCommon ( ) [private]

Definition at line 1674 of file FUEventProcessor.cc.

References attachDqmToShm(), gather_cfg::cout, alignCSCRings::e, evtProcessor_, exception, Exception, cms::Exception::explainSelf(), evf::StateMachine::fireEvent(), evf::StateMachine::fireFailed(), fsm_, hasShMem_, isRunNumberSetter_, localLog(), reasonForFailedState_, runNumber_, and stor::utils::sleep().

Referenced by enableClassic(), and enableMPEPSlave().

{
  try {    
    if(hasShMem_) attachDqmToShm();
    int sc = 0;
    evtProcessor_->clearCounters();
    if(isRunNumberSetter_)
      evtProcessor_->setRunNumber(runNumber_.value_);
    else
      evtProcessor_->declareRunNumber(runNumber_.value_);

    try{
      ::sleep(1);
      evtProcessor_->runAsync();
      sc = evtProcessor_->statusAsync();
    }
    catch(cms::Exception &e) {
      reasonForFailedState_ = e.explainSelf();
      fsm_.fireFailed(reasonForFailedState_,this);
      localLog(reasonForFailedState_);
      return false;
    }    
    catch(std::exception &e) {
      reasonForFailedState_  = e.what();
      fsm_.fireFailed(reasonForFailedState_,this);
      localLog(reasonForFailedState_);
      return false;
    }
    catch(...) {
      reasonForFailedState_ = "Unknown Exception";
      fsm_.fireFailed(reasonForFailedState_,this);
      localLog(reasonForFailedState_);
      return false;
    }
    if(sc != 0) {
      std::ostringstream oss;
      oss<<"EventProcessor::runAsync returned status code " << sc;
      reasonForFailedState_ = oss.str();
      fsm_.fireFailed(reasonForFailedState_,this);
      localLog(reasonForFailedState_);
      return false;
    }
  }
  catch (xcept::Exception &e) {
    reasonForFailedState_ = "enabling FAILED: " + (std::string)e.what();
    fsm_.fireFailed(reasonForFailedState_,this);
    localLog(reasonForFailedState_);
    return false;
  }
  try{
    fsm_.fireEvent("EnableDone",this);
  }
  catch (xcept::Exception &e) {
    std::cout << "exception " << (std::string)e.what() << std::endl;
    throw;
  }

  return false;
}
bool FUEventProcessor::enableForkInEDM ( ) [private]

Definition at line 1885 of file FUEventProcessor.cc.

References alignCSCRings::e, evtProcessor_, exception, cms::Exception::explainSelf(), evf::StateMachine::fireFailed(), evf::moduleweb::ForkInfoObj::forkHandler, forkInfoObj_, forkObjLock_, evf::moduleweb::ForkInfoObj::forkParams, forkProcessFromEDM_helper(), fsm_, evf::moduleweb::ForkInfoObj::fuAddr, isRunNumberSetter_, log_, evf::moduleweb::ForkInfoObj::mst_lock_, mwrRef_, evf::ModuleWebRegistry::publishForkInfo(), reasonForFailedState_, evf::FWEPWrapper::resetWaiting(), runNumber_, and evf::moduleweb::ForkInfoObj::stopCondition.

Referenced by enabling().

{
  evtProcessor_.resetWaiting();
  try {
    //set to connect to Shm later
    //if(hasShMem_) setAttachDqmToShm();

    int sc = 0;
    //maybe not needed in MP mode
    evtProcessor_->clearCounters();
    if(isRunNumberSetter_)
      evtProcessor_->setRunNumber(runNumber_.value_);
    else
      evtProcessor_->declareRunNumber(runNumber_.value_);

    //prepare object used to communicate with DaqSource
    pthread_mutex_destroy(&forkObjLock_);
    pthread_mutex_init(&forkObjLock_,0);
    if (forkInfoObj_) delete forkInfoObj_;
    forkInfoObj_ = new moduleweb::ForkInfoObj();
    forkInfoObj_->mst_lock_=&forkObjLock_;
    forkInfoObj_->fuAddr=(void*)this;
    forkInfoObj_->forkHandler = forkProcessFromEDM_helper;
    forkInfoObj_->forkParams.slotId=-1;
    forkInfoObj_->forkParams.restart=0;
    forkInfoObj_->forkParams.isMaster=-1;
    forkInfoObj_->stopCondition=0;
    if (mwrRef_)
      mwrRef_->publishForkInfo(std::string("DaqSource"),forkInfoObj_);

    evtProcessor_->runAsync();
    sc = evtProcessor_->statusAsync();

    if(sc != 0) {
      std::ostringstream oss;
      oss<<"EventProcessor::runAsync returned status code " << sc;
      reasonForFailedState_ = oss.str();
      fsm_.fireFailed(reasonForFailedState_,this);
      LOG4CPLUS_FATAL(log_,reasonForFailedState_);
      return false;
    }
  }
  //catch exceptions on master side
  catch(cms::Exception &e) {
    reasonForFailedState_ = e.explainSelf();
    fsm_.fireFailed(reasonForFailedState_,this);
    LOG4CPLUS_FATAL(log_,reasonForFailedState_);
    return false;
  }    
  catch(std::exception &e) {
    reasonForFailedState_  = e.what();
    fsm_.fireFailed(reasonForFailedState_,this);
    LOG4CPLUS_FATAL(log_,reasonForFailedState_);
    return false;
  }
  catch(...) {
    reasonForFailedState_ = "Unknown Exception";
    fsm_.fireFailed(reasonForFailedState_,this);
    LOG4CPLUS_FATAL(log_,reasonForFailedState_);
    return false;
  }
  return true;
}
bool FUEventProcessor::enableMPEPSlave ( ) [private]

Definition at line 1977 of file FUEventProcessor.cc.

References alignCSCRings::e, enableCommon(), evtProcessor_, Exception, evf::StateMachine::fireFailed(), fsm_, reco::get(), evf::FWEPWrapper::isWaitingForLs(), localLog(), edm::PresenceFactory::makePresence(), reasonForFailedState_, evf::FWEPWrapper::resetWaiting(), ML::MLlog4cplus::setAppl(), stor::utils::sleep(), startReceivingLoop(), startReceivingMonitorLoop(), and startScalersWorkLoop().

Referenced by enabling(), and supervisor().

{
  //all this happens only in the child process

  startReceivingLoop();
  startReceivingMonitorLoop();
  evtProcessor_.resetWaiting();
  startScalersWorkLoop();
  while(!evtProcessor_.isWaitingForLs())
    ::sleep(1);

  // @EM test do not run monitor loop in slave, only receiving&Monitor
  //  evtProcessor_.startMonitoringWorkLoop();
  try{
    //    evtProcessor_.makeServicesOnly();
    try{
      LOG4CPLUS_DEBUG(getApplicationLogger(),
                      "Trying to create message service presence ");
      edm::PresenceFactory *pf = edm::PresenceFactory::get();
      if(pf != 0) {
        pf->makePresence("MessageServicePresence").release();
      }
      else {
        LOG4CPLUS_ERROR(getApplicationLogger(),
                        "Unable to create message service presence ");
      }
    } 
    catch(...) {
      LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception");
    }
  ML::MLlog4cplus::setAppl(this);
  }       
  catch (xcept::Exception &e) {
    reasonForFailedState_ = "enabling FAILED: " + (std::string)e.what();
    fsm_.fireFailed(reasonForFailedState_,this);
    localLog(reasonForFailedState_);
  }
  bool retval =  enableCommon();
  //  while(evtProcessor_->getState()!= edm::event_processor::sRunning){
  //    LOG4CPLUS_INFO(getApplicationLogger(),"waiting for edm::EventProcessor to start before enabling watchdog");
  //    ::sleep(1);
  //  }
  return retval;
}
bool FUEventProcessor::enabling ( toolbox::task::WorkLoop *  wl)

Definition at line 422 of file FUEventProcessor.cc.

References toolbox::mem::_s_mutex_ptr_, configString_, evf::FWEPWrapper::configuration(), configuration_, cpustat_, evf::StateMachine::disableRcmsStateNotification(), alignCSCRings::e, edm_init_done_, enableClassic(), enableForkInEDM(), enableMPEPSlave(), epInitialized_, evtProcessor_, Exception, evf::StateMachine::fireEvent(), evf::StateMachine::fireFailed(), evf::FWEPWrapper::forceInitEventProcessorMaybe(), forkInEDM_, fsm_, evf::FWEPWrapper::getmicromap(), evf::FWEPWrapper::getModuleWebRegistry(), evf::FWEPWrapper::getNumberOfMicrostates(), evf::FWEPWrapper::getShmOutputModuleRegistry(), evf::ShmOutputModuleRegistry::getShmOutputModules(), hasModuleWebRegistry_, hasPrescaleService_, hasServiceWebRegistry_, i, iDieStatisticsGathering_, iDieUrl_, evf::FWEPWrapper::init(), instance_, localLog(), mwrRef_, myProcess_, nbSubProcesses_, nbTotalDQM_, pickup_lock_, ratestat_, reasonForFailedState_, evf::FWEPWrapper::resetLumiSectionReferenceIndex(), runNumber_, scalersLegendaInfoSpace_, scalersUpdates_, evf::CPUStat::sendLegenda(), evf::RateStat::sendLegenda(), edm::event_processor::sError, edm::event_processor::sInvalid, sorRef_, edm::event_processor::sRunning, evf::Vulture::start(), start_lock_, startSummarizeWorkLoop(), stop_lock_, subs_, vp_, and vulture_.

{
  nbTotalDQM_ = 0;
  scalersUpdates_ = 0;
//   std::cout << "values " << ((nbSubProcesses_.value_!=0) ? 0x10 : 0) << " "
//          << ((instance_.value_==0) ? 0x8 : 0) << " "
//          << (hasServiceWebRegistry_.value_ ? 0x4 : 0) << " "
//          << (hasModuleWebRegistry_.value_ ? 0x2 : 0) << " "
//          << (hasPrescaleService_.value_ ? 0x1 : 0) <<std::endl;
  unsigned short smap 
    = ((nbSubProcesses_.value_!=0) ? 0x10 : 0)
    + (((instance_.value_%80)==0) ? 0x8 : 0) // have at least one legend per slice
    + (hasServiceWebRegistry_.value_ ? 0x4 : 0) 
    + (hasModuleWebRegistry_.value_ ? 0x2 : 0) 
    + (hasPrescaleService_.value_ ? 0x1 : 0);

  LOG4CPLUS_INFO(getApplicationLogger(),"Start enabling...");
  if(!epInitialized_){
    evtProcessor_.forceInitEventProcessorMaybe();
  }
  std::string cfg = configString_.toString(); evtProcessor_.init(smap,cfg);
  
  mwrRef_ = evtProcessor_.getModuleWebRegistry();
  sorRef_ = evtProcessor_.getShmOutputModuleRegistry();

  if(!epInitialized_){
    evtProcessor_->beginJob(); 
    if(cpustat_) {delete cpustat_; cpustat_=0;}
    cpustat_ = new CPUStat(evtProcessor_.getNumberOfMicrostates(),
                           nbSubProcesses_.value_,
                           instance_.value_,
                           iDieUrl_.value_);
    if(ratestat_) {delete ratestat_; ratestat_=0;}
    ratestat_ = new RateStat(iDieUrl_.value_);
    if(iDieStatisticsGathering_.value_){
      try
        {
        cpustat_->sendLegenda(evtProcessor_.getmicromap());
        xdata::Serializable *legenda = scalersLegendaInfoSpace_->find("scalersLegenda");
        if(legenda !=0)
          {
            std::string slegenda = ((xdata::String*)legenda)->value_;
            ratestat_->sendLegenda(slegenda);
          }
        }
      catch(evf::Exception &e)
        {
          LOG4CPLUS_INFO(getApplicationLogger(),"could not send legenda"
                         << e.what());
        }
      catch (xcept::Exception& e) {
        LOG4CPLUS_ERROR(getApplicationLogger(),"Failed to get or send legenda."
                        << e.what());
      }
    }
    epInitialized_ = true;
  }
  configuration_ = evtProcessor_.configuration(); // get it again after init has been carried out...
  evtProcessor_.resetLumiSectionReferenceIndex();
  //classic appl will return here 
  if(nbSubProcesses_.value_==0) return enableClassic();
  //protect manipulation of subprocess array
  pthread_mutex_lock(&start_lock_);
  pthread_mutex_lock(&pickup_lock_);
  subs_.clear();
  subs_.resize(nbSubProcesses_.value_); // this should not be necessary
  pid_t retval = -1;

  for(unsigned int i=0; i<nbSubProcesses_.value_; i++)
    {
      subs_[i]=SubProcess(i,retval); //this will replace all the scattered variables
    }
  pthread_mutex_unlock(&pickup_lock_);

  pthread_mutex_unlock(&start_lock_);

  //set expected number of child EP's for the Init message(s) sent to the SM
  try {
    if (sorRef_) {
      unsigned int nbExpectedEPWriters = nbSubProcesses_.value_;
      if (nbExpectedEPWriters==0) nbExpectedEPWriters=1;//master instance processing
      std::vector<edm::FUShmOutputModule *> shmOutputs = sorRef_->getShmOutputModules();
      for (unsigned int i=0;i<shmOutputs.size();i++) {
        shmOutputs[i]->setNExpectedEPs(nbExpectedEPWriters);
      }
    }
  }
  catch (...)
  {
    LOG4CPLUS_ERROR(getApplicationLogger(),"Thrown Exception while setting nExpectedEPs in shmOutputs");
  }

  //use new method if configured
  edm_init_done_=true;
  if (forkInEDM_.value_) {
    edm_init_done_=false;
    enableForkInEDM();
  }
  else
    for(unsigned int i=0; i<nbSubProcesses_.value_; i++)
    {
      retval = subs_[i].forkNew();
      if(retval==0)
        {
          myProcess_ = &subs_[i];
          // dirty hack: delete/recreate global binary semaphore for later use in child
          delete toolbox::mem::_s_mutex_ptr_;
          toolbox::mem::_s_mutex_ptr_ = new toolbox::BSem(toolbox::BSem::FULL,true);
          int retval = pthread_mutex_destroy(&stop_lock_);
          if(retval != 0) perror("error");
          retval = pthread_mutex_init(&stop_lock_,0);
          if(retval != 0) perror("error");
          fsm_.disableRcmsStateNotification();
          
          return enableMPEPSlave();
          // the loop is broken in the child 
        }
    }

  if (!edm_init_done_) {
    //enable while we wait for beginRun/conditions to load
    LOG4CPLUS_INFO(getApplicationLogger(),"Finished enabling!");
    fsm_.fireEvent("EnableDone",this);
    localLog("-I- Start completed");

    edm::event_processor::State st;
    while (!edm_init_done_) {
      usleep(10000);
      st = evtProcessor_->getState();
      if (st==edm::event_processor::sError || st==edm::event_processor::sInvalid) break;
    }
    st = evtProcessor_->getState();
    //error handling: EP must fork during sRunning
    if (st!=edm::event_processor::sRunning) {
      reasonForFailedState_ = std::string("Master edm::EventProcessor in state ") + evtProcessor_->stateName(st);
      localLog(reasonForFailedState_);
      fsm_.fireFailed(reasonForFailedState_,this);
      return false;
    }
    startSummarizeWorkLoop();
    vp_ = vulture_->start(iDieUrl_.value_,runNumber_.value_);
    return false;
  }

  startSummarizeWorkLoop();
  vp_ = vulture_->start(iDieUrl_.value_,runNumber_.value_);
  LOG4CPLUS_INFO(getApplicationLogger(),"Finished enabling!");
  fsm_.fireEvent("EnableDone",this);
  localLog("-I- Start completed");
  return false;
}
void FUEventProcessor::forkProcessesFromEDM ( ) [private]

Definition at line 1738 of file FUEventProcessor.cc.

References toolbox::mem::_s_mutex_ptr_, evf::FWEPWrapper::adjustLsIndexForRestart(), applicationInfoSpace_, attachDqmToShm(), gather_cfg::cout, evf::StateMachine::disableRcmsStateNotification(), alignCSCRings::e, edm_init_done_, evtProcessor_, exception, evf::StateMachine::fireEvent(), evf::StateMachine::fireFailed(), forkInfoObj_, evf::moduleweb::ForkInfoObj::forkParams, fsm_, reco::get(), evf::ShmOutputModuleRegistry::getShmOutputModules(), hasShMem_, i, evf::moduleweb::ForkParams::isMaster, evf::FWEPWrapper::isWaitingForLs(), localLog(), edm::PresenceFactory::makePresence(), myProcess_, nbSubProcesses_, reasonForFailedState_, evf::FWEPWrapper::resetPackedTriggerReport(), evf::moduleweb::ForkParams::restart, scalersUpdates_, ML::MLlog4cplus::setAppl(), evf::moduleweb::ForkParams::slotId, sorRef_, startReceivingLoop(), startReceivingMonitorLoop(), startScalersWorkLoop(), stop_lock_, and subs_.

                                            {

  moduleweb::ForkParams * forkParams = &(forkInfoObj_->forkParams);
  unsigned int forkFrom=0;
  unsigned int forkTo=nbSubProcesses_.value_;
  if (forkParams->slotId>=0) {
    forkFrom=forkParams->slotId;
    forkTo=forkParams->slotId+1;
  }

  //before fork, make sure to disconnect output modules from Shm
  try {
    if (sorRef_) {
      std::vector<edm::FUShmOutputModule *> shmOutputs = sorRef_->getShmOutputModules();
      for (unsigned int i=0;i<shmOutputs.size();i++) {
        //unregister PID from ShmBuffer/RB
        shmOutputs[i]->unregisterFromShm();
        //disconnect from Shm
        shmOutputs[i]->stop();
      }
    }
  }
  catch (std::exception &e)
  {
    reasonForFailedState_ =  (std::string)"Thrown exception while disconnecting ShmOutputModule from Shm: " + e.what();
    LOG4CPLUS_ERROR(getApplicationLogger(),reasonForFailedState_);
    fsm_.fireFailed(reasonForFailedState_,this);
    localLog(reasonForFailedState_);
  }
  catch (...) {
    reasonForFailedState_ =  "Thrown unknown exception while disconnecting ShmOutputModule from Shm: ";
    LOG4CPLUS_ERROR(getApplicationLogger(),reasonForFailedState_);
    fsm_.fireFailed(reasonForFailedState_,this);
    localLog(reasonForFailedState_);
  }

  //fork loop
  for(unsigned int i=forkFrom; i<forkTo; i++)
  {

    int retval = subs_[i].forkNew();
    if(retval==0)
    {

      forkParams->isMaster=0;
      myProcess_ = &subs_[i];
      // dirty hack: delete/recreate global binary semaphore for later use in child
      delete toolbox::mem::_s_mutex_ptr_;
      toolbox::mem::_s_mutex_ptr_ = new toolbox::BSem(toolbox::BSem::FULL,true);
      int retval = pthread_mutex_destroy(&stop_lock_);
      if(retval != 0) perror("error");
      retval = pthread_mutex_init(&stop_lock_,0);
      if(retval != 0) perror("error");
      fsm_.disableRcmsStateNotification();

      try{
        LOG4CPLUS_DEBUG(getApplicationLogger(),
            "Trying to create message service presence ");
        //release the presense factory in master
        edm::PresenceFactory *pf = edm::PresenceFactory::get();
        if(pf != 0) {
          pf->makePresence("MessageServicePresence").release();
        }
        else {
          LOG4CPLUS_ERROR(getApplicationLogger(),
              "Unable to create message service presence ");
        }
      }
      catch(...) {
        LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception in MessageServicePresence");
      }

      ML::MLlog4cplus::setAppl(this);

      //reconnect to Shm from output modules
      try {
        if (sorRef_) {
          std::vector<edm::FUShmOutputModule *> shmOutputs = sorRef_->getShmOutputModules();
          for (unsigned int i=0;i<shmOutputs.size();i++)
            shmOutputs[i]->start();
        }
      }
      catch (...)
      {
        LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception (ShmOutputModule sending InitMsg (pid:"<<getpid() <<")");
      }

      if (forkParams->restart) {
        //do restart things
        scalersUpdates_ = 0;
        fsm_.fireEvent("Stop",this); // need to set state in fsm first to allow stopDone transition
        fsm_.fireEvent("StopDone",this); // need to set state in fsm first to allow stopDone transition
        fsm_.fireEvent("Enable",this); // need to set state in fsm first to allow stopDone transition
        try{
          xdata::Serializable *lsid = applicationInfoSpace_->find("lumiSectionIndex");
          if(lsid) {
            ((xdata::UnsignedInteger32*)(lsid))->value_--; // need to reset to value before end of ls in which process died
          }
        }
        catch(...){
          std::cout << "trouble with lsindex during restart" << std::endl;
        }
        try{
          xdata::Serializable *lstb = applicationInfoSpace_->find("lsToBeRecovered");
          if(lstb) {
            ((xdata::Boolean*)(lstb))->value_ = false; // do not issue eol/bol for all Ls when restarting
          }
        }
        catch(...){
          std::cout << "trouble with resetting flag for eol recovery " << std::endl;
        }
        evtProcessor_.adjustLsIndexForRestart();
        evtProcessor_.resetPackedTriggerReport();
      }

      //start other threads
      startReceivingLoop();
      startReceivingMonitorLoop();
      startScalersWorkLoop();
      while(!evtProcessor_.isWaitingForLs())
        ::usleep(100000);//wait for scalers loop to start

      //connect DQMShmOutputModule
      if(hasShMem_) attachDqmToShm();

      //catch transition error if we are already Enabled
      try {
        fsm_.fireEvent("EnableDone",this);
      }
      catch (...) {}
      //child return to DaqSource
      return ;
    }
    else {
      forkParams->isMaster=1;
      forkInfoObj_->forkParams.slotId=-1;
      forkInfoObj_->forkParams.restart=0;
      if (forkParams->restart) {
        std::ostringstream ost1;
        ost1 << "-I- New Process " << retval << " forked for slot " << forkParams->slotId;
        localLog(ost1.str());
      }
    }
  }
  edm_init_done_=true;
}
void FUEventProcessor::forkProcessFromEDM_helper ( void *  addr) [static]

Definition at line 1734 of file FUEventProcessor.cc.

Referenced by enableForkInEDM().

                                                            {
  ((FUEventProcessor*)addr)->forkProcessesFromEDM();
}
xoap::MessageReference FUEventProcessor::fsmCallback ( xoap::MessageReference  msg) throw (xoap::exception::Exception)

Definition at line 671 of file FUEventProcessor.cc.

References evf::StateMachine::commandCallback(), fsm_, and lumiQueryAPI::msg.

{
  return fsm_.commandCallback(msg);
}
bool FUEventProcessor::halting ( toolbox::task::WorkLoop *  wl)

Definition at line 640 of file FUEventProcessor.cc.

References doEndRunInEDM(), alignCSCRings::e, evtProcessor_, evf::StateMachine::fireEvent(), evf::StateMachine::fireFailed(), forkInEDM_, forkInfoObj_, fsm_, localLog(), nbSubProcesses_, reasonForFailedState_, evf::FWEPWrapper::stopAndHalt(), and stopSlavesAndAcknowledge().

{
  LOG4CPLUS_INFO(getApplicationLogger(),"Start halting ...");
  if(nbSubProcesses_.value_!=0) { 
    stopSlavesAndAcknowledge();
    if (forkInEDM_.value_) doEndRunInEDM();
  }
  try{
    evtProcessor_.stopAndHalt();
    //cleanup forking variables
    if (forkInfoObj_) {
      delete forkInfoObj_;
      forkInfoObj_=0;
    }
  }
  catch (evf::Exception &e) {
    reasonForFailedState_ = "halting FAILED: " + (std::string)e.what();
    localLog(reasonForFailedState_);
    fsm_.fireFailed(reasonForFailedState_,this);
  }
  //  if(hasShMem_) detachDqmFromShm();

  LOG4CPLUS_INFO(getApplicationLogger(),"Finished halting!");
  fsm_.fireEvent("HaltDone",this);

  localLog("-I- Halt completed");
  return false;
}
void FUEventProcessor::localLog ( std::string  m) [private]

Definition at line 976 of file FUEventProcessor.cc.

References logRing_, logRingIndex_, logRingSize_, logWrap_, m, and cond::timestamp.

Referenced by configuring(), enableClassic(), enableCommon(), enableMPEPSlave(), enabling(), forkProcessesFromEDM(), halting(), stopClassic(), stopSlavesAndAcknowledge(), and supervisor().

{
  timeval tv;

  gettimeofday(&tv,0);
  tm *uptm = localtime(&tv.tv_sec);
  char datestring[256];
  strftime(datestring, sizeof(datestring),"%c", uptm);

  if(logRingIndex_ == 0){logWrap_ = true; logRingIndex_ = logRingSize_;}
  logRingIndex_--;
  std::ostringstream timestamp;
  timestamp << " at " << datestring;
  m += timestamp.str();
  logRing_[logRingIndex_] = m;
}
std::string FUEventProcessor::logsAsString ( ) [private]

Definition at line 959 of file FUEventProcessor.cc.

References i, logRing_, logRingIndex_, and logWrap_.

{
  std::ostringstream oss;
  if(logWrap_)
    {
      for(unsigned int i = logRingIndex_; i < logRing_.size(); i++)
        oss << logRing_[i] << std::endl;
      for(unsigned int i = 0; i <  logRingIndex_; i++)
        oss << logRing_[i] << std::endl;
    }
  else
      for(unsigned int i = logRingIndex_; i < logRing_.size(); i++)
        oss << logRing_[i] << std::endl;
    
  return oss.str();
}
void FUEventProcessor::makeStaticInfo ( ) [private]

Definition at line 2291 of file FUEventProcessor.cc.

References applicationInfoSpace_, evf::utils::cDiv(), alignCSCRings::e, Exception, edm::getReleaseVersion(), hasModuleWebRegistry_, hasServiceWebRegistry_, hasShMem_, evf::utils::mDiv(), outPut_, and updaterStatic_.

Referenced by FUEventProcessor().

{
  using namespace utils;
  std::ostringstream ost;
  mDiv(&ost,"ve");
  ost<< "$Revision: 1.135 $ (" << edm::getReleaseVersion() <<")";
  cDiv(&ost);
  mDiv(&ost,"ou",outPut_.toString());
  mDiv(&ost,"sh",hasShMem_.toString());
  mDiv(&ost,"mw",hasModuleWebRegistry_.toString());
  mDiv(&ost,"sw",hasServiceWebRegistry_.toString());
  
  xdata::Serializable *monsleep = 0;
  xdata::Serializable *lstimeout = 0;
  try{
    monsleep  = applicationInfoSpace_->find("monSleepSec");
    lstimeout = applicationInfoSpace_->find("lsTimeOut");
  }
  catch(xdata::exception::Exception e){
  }
  
  if(monsleep!=0)
    mDiv(&ost,"ms",monsleep->toString());
  if(lstimeout!=0)
    mDiv(&ost,"lst",lstimeout->toString());
  char cbuf[sizeof(struct utsname)];
  struct utsname* buf = (struct utsname*)cbuf;
  uname(buf);
  mDiv(&ost,"sysinfo");
  ost << buf->sysname << " " << buf->nodename 
      << " " << buf->release << " " << buf->version << " " << buf->machine;
  cDiv(&ost);
  updaterStatic_ = ost.str();
}
void FUEventProcessor::microState ( xgi::Input *  in,
xgi::Output *  out 
)

Definition at line 2097 of file FUEventProcessor.cc.

References autoRestartSlaves_, gather_cfg::cout, alignCSCRings::e, evtProcessor_, exception, fsm_, evf::FWEPWrapper::getScalersUpdates(), i, evf::FWEPWrapper::microState(), evf::FWEPWrapper::moduleNameFromIndex(), myProcess_, nbdead_, nblive_, nbSubProcesses_, nbTotalDQM_, pickup_lock_, start_lock_, evf::StateMachine::stateName(), evf::FWEPWrapper::stateNameFromIndex(), subs_, and cms::Exception::what().

Referenced by FUEventProcessor().

{
  std::string urn = getApplicationDescriptor()->getURN();
  try{
    evtProcessor_.stateNameFromIndex(0);
    evtProcessor_.moduleNameFromIndex(0);
  if(myProcess_) {std::cout << "microstate called for child! bail out" << std::endl; return;}
  *out << "<tr><td>" << fsm_.stateName()->toString() 
       << "</td><td>"<< (myProcess_ ? "S" : "M") <<"</td><td>" << nblive_ << "</td><td>"
       << nbdead_ << "</td><td><a href=\"/" << urn << "/procStat\">" << getpid() <<"</a></td>";
  evtProcessor_.microState(in,out);
  *out << "<td></td><td>" << nbTotalDQM_ 
       << "</td><td>" << evtProcessor_.getScalersUpdates() << "</td></tr>";
  if(nbSubProcesses_.value_!=0 && !myProcess_) 
    {
      pthread_mutex_lock(&start_lock_);
      for(unsigned int i = 0; i < subs_.size(); i++)
        {
          try{
            if(subs_[i].alive()>0)
              {
                *out << "<tr><td  bgcolor=\"#00FF00\" id=\"a"
                     << i << "\">""Alive</td><td>S</td><td>"
                     << subs_[i].queueId() << "<td>" 
                     << subs_[i].queueStatus()<< "/"
                     << subs_[i].queueOccupancy() << "/"
                     << subs_[i].queuePidOfLastSend() << "/"
                     << subs_[i].queuePidOfLastReceive() 
                     << "</td><td><a id=\"p"<< i << "\" href=\"SubWeb?process=" 
                     << subs_[i].pid() << "&method=procStat\">" 
                     << subs_[i].pid()<<"</a></td>" //<< msg->mtext;
                     << "<td>" << evtProcessor_.stateNameFromIndex(subs_[i].params().Ms) << "</td><td>" 
                     << evtProcessor_.moduleNameFromIndex(subs_[i].params().ms) << "</td><td>" 
                     << subs_[i].params().nba << "/" << subs_[i].params().nbp 
                     << " (" << float(subs_[i].params().nba)/float(subs_[i].params().nbp)*100. <<"%)" 
                     << "</td><td>" << subs_[i].params().ls  << "/" << subs_[i].params().ls 
                     << "</td><td>" << subs_[i].params().ps 
                     << "</td><td" 
                     << ((subs_[i].params().eols<subs_[i].params().ls) ? " bgcolor=\"#00FF00\"" : " bgcolor=\"#FF0000\"")  << ">" 
                     << subs_[i].params().eols  
                     << "</td><td>" << subs_[i].params().dqm 
                     << "</td><td>" << subs_[i].params().trp << "</td>";
              }
            else 
              {
                pthread_mutex_lock(&pickup_lock_);
                *out << "<tr><td id=\"a"<< i << "\" ";
                if(subs_[i].alive()==-1000)
                  *out << " bgcolor=\"#bbaabb\">NotInitialized";
                else
                  *out << (subs_[i].alive()==0 ? ">Done" : " bgcolor=\"#FF0000\">Dead");
                *out << "</td><td>S</td><td>"<< subs_[i].queueId() << "<td>" 
                     << subs_[i].queueStatus() << "/"
                     << subs_[i].queueOccupancy() << "/"
                     << subs_[i].queuePidOfLastSend() << "/"
                     << subs_[i].queuePidOfLastReceive() 
                     << "</td><td id=\"p"<< i << "\">"
                     <<subs_[i].pid()<<"</td><td colspan=\"5\">" << subs_[i].reasonForFailed();
                if(subs_[i].alive()!=0 && subs_[i].alive()!=-1000) 
                  {
                    if(autoRestartSlaves_ && subs_[i].restartCount()<=2) 
                      *out << " will restart in " << subs_[i].countdown() << " s";
                    else if(autoRestartSlaves_)
                      *out << " reached maximum restart count";
                    else *out << " autoRestart is disabled ";
                  }
                *out << "</td><td" 
                     << ((subs_[i].params().eols<subs_[i].params().ls) ? 
                         " bgcolor=\"#00FF00\"" : " bgcolor=\"#FF0000\"")  
                     << ">" 
                     << subs_[i].params().eols  
                     << "</td><td>" << subs_[i].params().dqm 
                     << "</td><td>" << subs_[i].params().trp << "</td>";
                pthread_mutex_unlock(&pickup_lock_);
              }
            *out << "</tr>";
          }
          catch(evf::Exception &e){
            *out << "<tr><td id=\"a"<< i << "\" " 
                 <<"bgcolor=\"#FFFF00\">NotResp</td><td>S</td><td>"<< subs_[i].queueId() << "<td>" 
                 << subs_[i].queueStatus() << "/"
                 << subs_[i].queueOccupancy() << "/"
                 << subs_[i].queuePidOfLastSend() << "/"
                 << subs_[i].queuePidOfLastReceive() 
                 << "</td><td id=\"p"<< i << "\">"
                 <<subs_[i].pid()<<"</td>";
          }
        }
      pthread_mutex_unlock(&start_lock_); 
    }
  }
      catch(evf::Exception &e)
      {
        LOG4CPLUS_INFO(getApplicationLogger(),"evf::Exception caught in microstate - " << e.what());    
      }
    catch(cms::Exception &e)
      {
        LOG4CPLUS_INFO(getApplicationLogger(),"cms::Exception caught in microstate - " << e.what());    
      }
    catch(std::exception &e)
      {
        LOG4CPLUS_INFO(getApplicationLogger(),"std::Exception caught in microstate - " << e.what());    
      }
    catch(...)
      {
        LOG4CPLUS_INFO(getApplicationLogger(),"unknown exception caught in microstate - ");    
      }

}
void evf::FUEventProcessor::moduleWeb ( xgi::Input *  in,
xgi::Output *  out 
) [inline]

Definition at line 110 of file FUEventProcessor.h.

References evtProcessor_, and evf::FWEPWrapper::moduleWeb().

Referenced by FUEventProcessor(), and receivingAndMonitor().

{evtProcessor_.moduleWeb(in,out);}
void FUEventProcessor::pathNames ( xgi::Input *  in,
xgi::Output *  out 
) throw (xgi::exception::Exception)

Definition at line 895 of file FUEventProcessor.cc.

References evtProcessor_, dbtoconf::out, and scalersLegendaInfoSpace_.

Referenced by FUEventProcessor().

{

  if(evtProcessor_ != 0){
    xdata::Serializable *legenda = scalersLegendaInfoSpace_->find("scalersLegenda");
    if(legenda !=0){
      std::string slegenda = ((xdata::String*)legenda)->value_;
      *out << slegenda << std::endl;
    }
  }
}
void FUEventProcessor::procStat ( xgi::Input *  in,
xgi::Output *  out 
)

Definition at line 2281 of file FUEventProcessor.cc.

Referenced by FUEventProcessor(), and receivingAndMonitor().

bool FUEventProcessor::receiving ( toolbox::task::WorkLoop *  wl) [private]

Definition at line 1048 of file FUEventProcessor.cc.

References alignCSCRings::e, evf::StateMachine::fireEvent(), fsm_, reco::get(), edm::PresenceFactory::makePresence(), lumiQueryAPI::msg, MSQM_MESSAGE_TYPE_FSTOP, MSQM_MESSAGE_TYPE_STOP, MSQS_MESSAGE_TYPE_STOP, myProcess_, evf::SubProcess::postSlave(), evf::SubProcess::rcvSlave(), stop_lock_, and stopClassic().

Referenced by startReceivingLoop().

{
  MsgBuf msg;
  try{
    myProcess_->rcvSlave(msg,false); //will receive only messages from Master
    if(msg->mtype==MSQM_MESSAGE_TYPE_STOP)
      {
        pthread_mutex_lock(&stop_lock_);
        fsm_.fireEvent("Stop",this); // need to set state in fsm first to allow stopDone transition
        try{
          LOG4CPLUS_DEBUG(getApplicationLogger(),
                          "Trying to create message service presence ");
          edm::PresenceFactory *pf = edm::PresenceFactory::get();
          if(pf != 0) {
            pf->makePresence("MessageServicePresence").release();
          }
          else {
            LOG4CPLUS_ERROR(getApplicationLogger(),
                            "Unable to create message service presence ");
          }
        } 
        catch(...) {
          LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception");
        }
        stopClassic(); // call the normal sequence of stopping - as this is allowed to fail provisions must be made ...@@@EM
        MsgBuf msg1(0,MSQS_MESSAGE_TYPE_STOP);
        myProcess_->postSlave(msg1,false);
        pthread_mutex_unlock(&stop_lock_);
        fclose(stdout);
        fclose(stderr);
        _exit(EXIT_SUCCESS);
      }
    if(msg->mtype==MSQM_MESSAGE_TYPE_FSTOP)
      _exit(EXIT_SUCCESS);
  }
  catch(evf::Exception &e){}
  return true;
}
bool FUEventProcessor::receivingAndMonitor ( toolbox::task::WorkLoop *  wl) [private]

Definition at line 1529 of file FUEventProcessor.cc.

References anonymousPipe_, applicationInfoSpace_, harvestRelVal::args, prof2calltree::count, gather_cfg::cout, cycle, AlCaHLTBitMon_QueryRunRegistry::data, defaultWebPage(), evf::prg::dqm, alignCSCRings::e, evf::prg::eols, evf::FWEPWrapper::epMAltState_, evf::FWEPWrapper::epmAltState_, evtProcessor_, Exception, exitOnError_, spr::find(), first, fsm_, evf::internal::MyCgi::getEnvironment(), recoMuon::in, Input, evf::FWEPWrapper::lastLumiUsingEol_, evf::prg::ls, evf::FWEPWrapper::lsid_, MAX_PIPE_BUFFER_SIZE, PFRecoTauDiscriminationAgainstElectronMVA_cfi::method, evf::FWEPWrapper::microState(), moduleWeb(), evf::FWEPWrapper::monitoring(), evf::prg::Ms, evf::prg::ms, MSQM_MESSAGE_TYPE_MCS, MSQM_MESSAGE_TYPE_PRG, MSQM_MESSAGE_TYPE_TRP, MSQM_MESSAGE_TYPE_WEB, MSQS_MESSAGE_TYPE_MCR, MSQS_MESSAGE_TYPE_WEB, myProcess_, evf::prg::nba, evf::prg::nbp, NUMERIC_MESSAGE_SIZE, dbtoconf::out, Output, PIPE_WRITE, pos, evf::SubProcess::postSlave(), procStat(), evf::prg::ps, evf::FWEPWrapper::psid_, o2o::query, evf::SubProcess::rcvSlave(), scalersUpdates_, edm::second(), edm::event_processor::sError, slave_message_monitoring_, slave_message_prr_, stor::utils::sleep(), spotlightWebPage(), edm::event_processor::sStopping, evf::StateMachine::stateName(), stop_lock_, evf::prg::trp, and TablePrint::write.

Referenced by startReceivingMonitorLoop().

{
  try{
    myProcess_->rcvSlave(slave_message_monitoring_,true); //will receive only messages from Master
    switch(slave_message_monitoring_->mtype)
      {
      case MSQM_MESSAGE_TYPE_MCS:
        {
          xgi::Input *in = 0;
          xgi::Output out;
          evtProcessor_.microState(in,&out);
          MsgBuf msg1(out.str().size(),MSQS_MESSAGE_TYPE_MCR);
          strncpy(msg1->mtext,out.str().c_str(),out.str().size());
          myProcess_->postSlave(msg1,true);
          break;
        }
      
      case MSQM_MESSAGE_TYPE_PRG:
        {
          xdata::Serializable *dqmp = 0;
          xdata::UnsignedInteger32 *dqm = 0;
          evtProcessor_.monitoring(0);
          try{
            dqmp = applicationInfoSpace_-> find("nbDqmUpdates");
          }  catch(xdata::exception::Exception e){}
          if(dqmp!=0)
            dqm = (xdata::UnsignedInteger32*)dqmp;

          //      monitorInfoSpace_->lock();  
          prg * data           = (prg*)slave_message_prr_->mtext;
          data->ls             = evtProcessor_.lsid_;
          data->eols           = evtProcessor_.lastLumiUsingEol_;
          data->ps             = evtProcessor_.psid_;
          data->nbp            = evtProcessor_->totalEvents();
          data->nba            = evtProcessor_->totalEventsPassed();
          data->Ms             = evtProcessor_.epMAltState_.value_;
          data->ms             = evtProcessor_.epmAltState_.value_;
          if(dqm) data->dqm    = dqm->value_; else data->dqm = 0;
          data->trp            = scalersUpdates_;
          //      monitorInfoSpace_->unlock();  
          myProcess_->postSlave(slave_message_prr_,true);
          if(exitOnError_.value_)
          { 
            // after each monitoring cycle check if we are in inconsistent state and exit if configured to do so  
            //      std::cout << getpid() << "receivingAndMonitor: trying to acquire stop lock " << std::endl;
            if(data->Ms == edm::event_processor::sStopping || data->Ms == edm::event_processor::sError) 
              { 
                bool running = true;
                int count = 0;
                while(running){
                  int retval = pthread_mutex_lock(&stop_lock_);
                  if(retval != 0) perror("error");
                  running = fsm_.stateName()->toString()=="Enabled";
                  if(count>5) _exit(-1);
                  pthread_mutex_unlock(&stop_lock_);
                  if(running) {::sleep(1); count++;}
                }
              }
            
          }
          //      scalersUpdates_++;
          break;
        }
      case MSQM_MESSAGE_TYPE_WEB:
        {
          xgi::Input  *in = 0;
          xgi::Output out;
          unsigned int bytesToSend = 0;
          MsgBuf msg1(NUMERIC_MESSAGE_SIZE,MSQS_MESSAGE_TYPE_WEB);
          std::string query = slave_message_monitoring_->mtext;
          size_t pos = query.find_first_of("&");
          std::string method;
          std::string args;
          if(pos!=std::string::npos)  
            {
              method = query.substr(0,pos);
              args = query.substr(pos+1,query.length()-pos-1);
            }
          else
            method=query;

          if(method=="Spotlight")
            {
              spotlightWebPage(in,&out);
            }
          else if(method=="procStat")
            {
              procStat(in,&out);
            }
          else if(method=="moduleWeb")
            {
              internal::MyCgi mycgi;
              boost::char_separator<char> sep(";");
              boost::tokenizer<boost::char_separator<char> > tokens(args, sep);
              for (boost::tokenizer<boost::char_separator<char> >::iterator tok_iter = tokens.begin();
                   tok_iter != tokens.end(); ++tok_iter){
                size_t pos = (*tok_iter).find_first_of("%");
                if(pos != std::string::npos){
                  std::string first  = (*tok_iter).substr(0    ,                        pos);
                  std::string second = (*tok_iter).substr(pos+1, (*tok_iter).length()-pos-1);
                  mycgi.getEnvironment()[first]=second;
                }
              }
              moduleWeb(&mycgi,&out);
            }
          else if(method=="Default")
            {
              defaultWebPage(in,&out);
            }
          else 
            {
              out << "Error 404!!!!!!!!" << std::endl;
            }


          bytesToSend = out.str().size();
          unsigned int cycle = 0;
          if(bytesToSend==0)
            {
              snprintf(msg1->mtext, NUMERIC_MESSAGE_SIZE, "%d", bytesToSend);
              myProcess_->postSlave(msg1,true);
            }
          while(bytesToSend !=0){
            unsigned int msgSize = bytesToSend>MAX_PIPE_BUFFER_SIZE ? MAX_PIPE_BUFFER_SIZE : bytesToSend;
            write(anonymousPipe_[PIPE_WRITE],
                  out.str().c_str()+MAX_PIPE_BUFFER_SIZE*cycle,
                  msgSize);
            snprintf(msg1->mtext, NUMERIC_MESSAGE_SIZE, "%d", msgSize);
            myProcess_->postSlave(msg1,true);
            bytesToSend -= msgSize;
            cycle++;
          }
          break;
        }
      case MSQM_MESSAGE_TYPE_TRP:
        {
          break;
        }
      }
  }
  catch(evf::Exception &e){std::cout << "exception caught in recevingM: " << e.what() << std::endl;}
  return true;
}
bool FUEventProcessor::restartForkInEDM ( unsigned int  slotId) [private]

Definition at line 1949 of file FUEventProcessor.cc.

References evf::moduleweb::ForkInfoObj::control_sem_, forkInfoObj_, evf::moduleweb::ForkInfoObj::forkParams, evf::moduleweb::ForkInfoObj::lock(), log_, evf::moduleweb::ForkInfoObj::stopCondition, and evf::moduleweb::ForkInfoObj::unlock().

Referenced by supervisor().

                                                           {
  //daqsource will keep this lock until master returns after fork
  //so that we don't do another EP restart in between
  forkInfoObj_->lock();
  forkInfoObj_->forkParams.slotId=slotId;
  forkInfoObj_->forkParams.restart=true;
  forkInfoObj_->forkParams.isMaster=1;
  forkInfoObj_->stopCondition=0;
  LOG4CPLUS_DEBUG(log_, " restarting subprocess in slot "<< slotId <<": posting on control semaphore");
  sem_post(forkInfoObj_->control_sem_);
  forkInfoObj_->unlock();
  usleep(1000);
  return true;
}
bool FUEventProcessor::scalers ( toolbox::task::WorkLoop *  wl) [private]

Definition at line 1409 of file FUEventProcessor.cc.

References gather_cfg::cout, evtProcessor_, evf::FWEPWrapper::fireScalersUpdate(), evf::FWEPWrapper::getPackedTriggerReport(), evf::FWEPWrapper::getTriggerReport(), myProcess_, evf::SubProcess::postSlave(), runTheMatrix::ret, scalersUpdates_, and wlScalersActive_.

Referenced by startScalersWorkLoop().

{
  if(evtProcessor_)
    {
      if(!evtProcessor_.getTriggerReport(true)) {
        wlScalersActive_ = false;
        return false;
      }
    }
  else
    {
      std::cout << getpid()<< " Scalers workloop, bailing out, no evtProcessor " << std::endl;
      wlScalersActive_ = false;
      return false;
    }
  if(myProcess_) 
    {
      //      std::cout << getpid() << "going to post on control queue from scalers" << std::endl;
      int ret = myProcess_->postSlave(evtProcessor_.getPackedTriggerReport(),false);
      if(ret!=0)      std::cout << "scalers workloop, error posting to sqs_ " << errno << std::endl;
      scalersUpdates_++;
    }
  else
    evtProcessor_.fireScalersUpdate();
  return true;
}
void FUEventProcessor::scalersWeb ( xgi::Input *  in,
xgi::Output *  out 
) throw (xgi::exception::Exception)

Definition at line 882 of file FUEventProcessor.cc.

References evtProcessor_, evf::FWEPWrapper::getPackedTriggerReportAsStruct(), and dbtoconf::out.

Referenced by FUEventProcessor().

{

  out->getHTTPResponseHeader().addHeader( "Content-Type",
                                          "application/octet-stream" );
  out->getHTTPResponseHeader().addHeader( "Content-Transfer-Encoding",
                                          "binary" );
  if(evtProcessor_ != 0){
    out->write( (char*)(evtProcessor_.getPackedTriggerReportAsStruct()), sizeof(TriggerReportStatic) );
  }
}
void FUEventProcessor::sendMessageOverMonitorQueue ( MsgBuf buf)

Definition at line 2286 of file FUEventProcessor.cc.

References myProcess_, and evf::SubProcess::postSlave().

{
  if(myProcess_) myProcess_->postSlave(buf,true);
}
void evf::FUEventProcessor::serviceWeb ( xgi::Input *  in,
xgi::Output *  out 
) [inline]

Definition at line 111 of file FUEventProcessor.h.

References evtProcessor_, and evf::FWEPWrapper::serviceWeb().

Referenced by FUEventProcessor().

{evtProcessor_.serviceWeb(in,out);}
void FUEventProcessor::setAttachDqmToShm ( ) throw (evf::Exception) [private]

Definition at line 909 of file FUEventProcessor.cc.

References alignCSCRings::e, evtProcessor_, edm::Service< T >::isAvailable(), and cms::Exception::what().

{
  std::string errmsg;
  try {
    edm::ServiceRegistry::Operate operate(evtProcessor_->getToken());
    if(edm::Service<FUShmDQMOutputService>().isAvailable())
      edm::Service<FUShmDQMOutputService>()->setAttachToShm();
  }
  catch (cms::Exception& e) {
    errmsg = "Failed to set to attach DQM service to shared memory: " + (std::string)e.what();
  }
  if (!errmsg.empty()) XCEPT_RAISE(evf::Exception,errmsg);
}
void FUEventProcessor::spotlightWebPage ( xgi::Input *  in,
xgi::Output *  out 
) throw (xgi::exception::Exception)

Definition at line 797 of file FUEventProcessor.cc.

References configuration_, evtProcessor_, fsm_, recoMuon::in, myProcess_, nbSubProcesses_, dbtoconf::out, evf::StateMachine::stateName(), evf::FWEPWrapper::summaryWebPage(), and evf::FWEPWrapper::taskWebPage().

Referenced by FUEventProcessor(), and receivingAndMonitor().

{

  std::string urn = getApplicationDescriptor()->getURN();

  *out << "<!-- base href=\"/" <<  urn
       << "\"> -->" << std::endl;
  *out << "<html>"                                                   << std::endl;
  *out << "<head>"                                                   << std::endl;
  *out << "<link type=\"text/css\" rel=\"stylesheet\"";
  *out << " href=\"/evf/html/styles.css\"/>"                   << std::endl;
  *out << "<title>" << getApplicationDescriptor()->getClassName() 
       << getApplicationDescriptor()->getInstance() 
       << " MAIN</title>"     << std::endl;
  *out << "</head>"                                                  << std::endl;
  *out << "<body>"                                                   << std::endl;
  *out << "<table border=\"0\" width=\"100%\">"                      << std::endl;
  *out << "<tr>"                                                     << std::endl;
  *out << "  <td align=\"left\">"                                    << std::endl;
  *out << "    <img"                                                 << std::endl;
  *out << "     align=\"middle\""                                    << std::endl;
  *out << "     src=\"/evf/images/spoticon.jpg\""                            << std::endl;
  *out << "     alt=\"main\""                                        << std::endl;
  *out << "     width=\"64\""                                        << std::endl;
  *out << "     height=\"64\""                                       << std::endl;
  *out << "     border=\"\"/>"                                       << std::endl;
  *out << "    <b>"                                                  << std::endl;
  *out << getApplicationDescriptor()->getClassName() 
       << getApplicationDescriptor()->getInstance()                  << std::endl;
  *out << "      " << fsm_.stateName()->toString()                   << std::endl;
  *out << "    </b>"                                                 << std::endl;
  *out << "  </td>"                                                  << std::endl;
  *out << "  <td width=\"32\">"                                      << std::endl;
  *out << "    <a href=\"/urn:xdaq-application:lid=3\">"             << std::endl;
  *out << "      <img"                                               << std::endl;
  *out << "       align=\"middle\""                                  << std::endl;
  *out << "       src=\"/hyperdaq/images/HyperDAQ.jpg\""             << std::endl;
  *out << "       alt=\"HyperDAQ\""                                  << std::endl;
  *out << "       width=\"32\""                                      << std::endl;
  *out << "       height=\"32\""                                     << std::endl;
  *out << "       border=\"\"/>"                                     << std::endl;
  *out << "    </a>"                                                 << std::endl;
  *out << "  </td>"                                                  << std::endl;
  *out << "  <td width=\"32\">"                                      << std::endl;
  *out << "  </td>"                                                  << std::endl;
  *out << "  <td width=\"32\">"                                      << std::endl;
  *out << "    <a href=\"/" << urn << "/\">"                         << std::endl;
  *out << "      <img"                                               << std::endl;
  *out << "       align=\"middle\""                                  << std::endl;
  *out << "       src=\"/evf/images/epicon.jpg\""                    << std::endl;
  *out << "       alt=\"main\""                                      << std::endl;
  *out << "       width=\"32\""                                      << std::endl;
  *out << "       height=\"32\""                                     << std::endl;
  *out << "       border=\"\"/>"                                     << std::endl;
  *out << "    </a>"                                                 << std::endl;
  *out << "  </td>"                                                  << std::endl;
  *out << "</tr>"                                                    << std::endl;
  *out << "</table>"                                                 << std::endl;

  *out << "<hr/>"                                                    << std::endl;
  
  std::ostringstream ost;
  if(myProcess_) 
    ost << "/SubWeb?process=" << getpid() << "&method=moduleWeb&";
  else
    ost << "/moduleWeb?";
  urn += ost.str();
  if(evtProcessor_ && (myProcess_ || nbSubProcesses_.value_==0))
    evtProcessor_.taskWebPage(in,out,urn);
  else if(evtProcessor_)
    evtProcessor_.summaryWebPage(in,out,urn);
  else
    *out << "<td>HLT Unconfigured</td>" << std::endl;
  *out << "</table>"                                                 << std::endl;
  
  *out << "<br><textarea rows=" << 10 << " cols=80 scroll=yes>"      << std::endl;
  *out << configuration_                                             << std::endl;
  *out << "</textarea><P>"                                           << std::endl;
  
  *out << "</body>"                                                  << std::endl;
  *out << "</html>"                                                  << std::endl;


}
void FUEventProcessor::startReceivingLoop ( ) [private]

Definition at line 1011 of file FUEventProcessor.cc.

References asReceiveMsgAndExecute_, alignCSCRings::e, Exception, lumiQueryAPI::msg, receiving(), receiving_, and wlReceiving_.

Referenced by enableMPEPSlave(), and forkProcessesFromEDM().

{
  try {
    wlReceiving_=
      toolbox::task::getWorkLoopFactory()->getWorkLoop("Receiving",
                                                       "waiting");
    if (!wlReceiving_->isActive()) wlReceiving_->activate();
    asReceiveMsgAndExecute_ = toolbox::task::bind(this,&FUEventProcessor::receiving,
                                        "Receiving");
    wlReceiving_->submit(asReceiveMsgAndExecute_);
    receiving_ = true;
  }
  catch (xcept::Exception& e) {
    std::string msg = "Failed to start workloop 'Receiving'.";
    XCEPT_RETHROW(evf::Exception,msg,e);
  }
}
void FUEventProcessor::startReceivingMonitorLoop ( ) [private]

Definition at line 1028 of file FUEventProcessor.cc.

References asReceiveMsgAndRead_, alignCSCRings::e, Exception, lumiQueryAPI::msg, receivingAndMonitor(), receivingM_, and wlReceivingMonitor_.

Referenced by enableMPEPSlave(), and forkProcessesFromEDM().

{
  try {
    wlReceivingMonitor_=
      toolbox::task::getWorkLoopFactory()->getWorkLoop("ReceivingM",
                                                       "waiting");
    if (!wlReceivingMonitor_->isActive()) 
      wlReceivingMonitor_->activate();
    asReceiveMsgAndRead_ = 
      toolbox::task::bind(this,&FUEventProcessor::receivingAndMonitor,
                          "ReceivingM");
    wlReceivingMonitor_->submit(asReceiveMsgAndRead_);
    receivingM_ = true;
  }
  catch (xcept::Exception& e) {
    std::string msg = "Failed to start workloop 'ReceivingM'.";
    XCEPT_RETHROW(evf::Exception,msg,e);
  }
}
void FUEventProcessor::startScalersWorkLoop ( ) throw (evf::Exception) [private]

Definition at line 1366 of file FUEventProcessor.cc.

References asScalers_, alignCSCRings::e, Exception, lumiQueryAPI::msg, scalers(), wlScalers_, and wlScalersActive_.

Referenced by enableMPEPSlave(), and forkProcessesFromEDM().

{
  try {
    wlScalers_=
      toolbox::task::getWorkLoopFactory()->getWorkLoop("Scalers",
                                                       "waiting");
    if (!wlScalers_->isActive()) wlScalers_->activate();
    asScalers_ = toolbox::task::bind(this,&FUEventProcessor::scalers,
                                     "Scalers");
    
  wlScalers_->submit(asScalers_);
  wlScalersActive_ = true;
  }
  catch (xcept::Exception& e) {
    std::string msg = "Failed to start workloop 'Scalers'.";
    XCEPT_RETHROW(evf::Exception,msg,e);
  }
}
void FUEventProcessor::startSummarizeWorkLoop ( ) throw (evf::Exception) [private]

Definition at line 1387 of file FUEventProcessor.cc.

References asSummarize_, alignCSCRings::e, Exception, lumiQueryAPI::msg, summarize(), wlSummarize_, and wlSummarizeActive_.

Referenced by enabling().

{
  try {
    wlSummarize_=
      toolbox::task::getWorkLoopFactory()->getWorkLoop("Summary",
                                                       "waiting");
    if (!wlSummarize_->isActive()) wlSummarize_->activate();
    
    asSummarize_ = toolbox::task::bind(this,&FUEventProcessor::summarize,
                                       "Summary");

    wlSummarize_->submit(asSummarize_);
    wlSummarizeActive_ = true;
  }
  catch (xcept::Exception& e) {
    std::string msg = "Failed to start workloop 'Summarize'.";
    XCEPT_RETHROW(evf::Exception,msg,e);
  }
}
void FUEventProcessor::startSupervisorLoop ( ) [private]

Definition at line 993 of file FUEventProcessor.cc.

References asSupervisor_, alignCSCRings::e, Exception, lumiQueryAPI::msg, supervising_, supervisor(), and wlSupervising_.

Referenced by FUEventProcessor().

{
  try {
    wlSupervising_=
      toolbox::task::getWorkLoopFactory()->getWorkLoop("Supervisor",
                                                       "waiting");
    if (!wlSupervising_->isActive()) wlSupervising_->activate();
    asSupervisor_ = toolbox::task::bind(this,&FUEventProcessor::supervisor,
                                        "Supervisor");
    wlSupervising_->submit(asSupervisor_);
    supervising_ = true;
  }
  catch (xcept::Exception& e) {
    std::string msg = "Failed to start workloop 'Supervisor'.";
    XCEPT_RETHROW(evf::Exception,msg,e);
  }
}
bool FUEventProcessor::stopClassic ( ) [private]

Definition at line 2022 of file FUEventProcessor.cc.

References detachDqmFromShm(), alignCSCRings::e, edm::IEventProcessor::epSuccess, edm::IEventProcessor::epTimedOut, evtProcessor_, Exception, evf::StateMachine::fireEvent(), evf::StateMachine::fireFailed(), fsm_, hasShMem_, localLog(), reasonForFailedState_, and evf::FWEPWrapper::stop().

Referenced by receiving(), and stopping().

{
  try {
    LOG4CPLUS_INFO(getApplicationLogger(),"Start stopping :) ...");
    edm::EventProcessor::StatusCode rc = evtProcessor_.stop();
    if(rc == edm::EventProcessor::epSuccess) 
      fsm_.fireEvent("StopDone",this);
    else
      {
        //      epMState_ = evtProcessor_->currentStateName();
        if(rc == edm::EventProcessor::epTimedOut)
          reasonForFailedState_ = "EventProcessor stop timed out";
        else
          reasonForFailedState_ = "EventProcessor did not receive STOP event";
        fsm_.fireFailed(reasonForFailedState_,this);
        localLog(reasonForFailedState_);
      }
    if(hasShMem_) detachDqmFromShm();
  }
  catch (xcept::Exception &e) {
    reasonForFailedState_ = "stopping FAILED: " + (std::string)e.what();
    localLog(reasonForFailedState_);
    fsm_.fireFailed(reasonForFailedState_,this);
  }
  LOG4CPLUS_INFO(getApplicationLogger(),"Finished stopping!");
  localLog("-I- Stop completed");
  return false;
}
bool FUEventProcessor::stopping ( toolbox::task::WorkLoop *  wl)

Definition at line 620 of file FUEventProcessor.cc.

References doEndRunInEDM(), forkInEDM_, hasShMem_, nbSubProcesses_, evf::Vulture::stop(), stopClassic(), stopSlavesAndAcknowledge(), and vulture_.

Referenced by supervisor().

{
  if(nbSubProcesses_.value_!=0) {
    stopSlavesAndAcknowledge();
    if (forkInEDM_.value_) doEndRunInEDM();
  }
  vulture_->stop();

  if (forkInEDM_.value_) {
    bool tmpHasShMem_=hasShMem_;
    hasShMem_=false;
    bool stop_status = stopClassic();
    hasShMem_=tmpHasShMem_;
    return stop_status;
  }
  return stopClassic();
}
void FUEventProcessor::stopSlavesAndAcknowledge ( ) [private]

Definition at line 2051 of file FUEventProcessor.cc.

References alignCSCRings::e, i, localLog(), MAX_MSG_SIZE, lumiQueryAPI::msg, MSQM_MESSAGE_TYPE_STOP, MSQS_MESSAGE_TYPE_STOP, nbSubProcesses_, reasonForFailedState_, stop_lock_, and subs_.

Referenced by halting(), and stopping().

{
  MsgBuf msg(0,MSQM_MESSAGE_TYPE_STOP);
  MsgBuf msg1(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_STOP);

  std::vector<bool> processes_to_stop(nbSubProcesses_.value_,false);
  for(unsigned int i = 0; i < subs_.size(); i++)
    {
      pthread_mutex_lock(&stop_lock_);
      if(subs_[i].alive()>0){
        processes_to_stop[i] = true;
        subs_[i].post(msg,false);
      }
      pthread_mutex_unlock(&stop_lock_);
    }
  for(unsigned int i = 0; i < subs_.size(); i++)
    {
      pthread_mutex_lock(&stop_lock_);
      if(processes_to_stop[i]){
        try{
          subs_[i].rcv(msg1,false);
        }
        catch(evf::Exception &e){
          std::ostringstream ost;
          ost << "failed to get STOP - errno ->" << errno << " " << e.what(); 
          reasonForFailedState_ = ost.str();
          LOG4CPLUS_ERROR(getApplicationLogger(),reasonForFailedState_);
          //      fsm_.fireFailed(reasonForFailedState_,this);
          localLog(reasonForFailedState_);
          pthread_mutex_unlock(&stop_lock_);
          continue;
        }
      }
      else {
        pthread_mutex_unlock(&stop_lock_);
        continue;
      }
      pthread_mutex_unlock(&stop_lock_);
      if(msg1->mtype==MSQS_MESSAGE_TYPE_STOP)
        while(subs_[i].alive()>0) ::usleep(10000);
      subs_[i].disconnect();
    }
  //  subs_.clear();

}
void FUEventProcessor::subWeb ( xgi::Input *  in,
xgi::Output *  out 
)

Definition at line 713 of file FUEventProcessor.cc.

References anonymousPipe_, gather_cfg::cout, generateEDF::done, asciidump::els, i, j, Association::map, MAX_MSG_SIZE, MAX_PIPE_BUFFER_SIZE, mod(), lumiQueryAPI::msg, MSGQ_MESSAGE_TYPE_RANGE, MSQM_MESSAGE_TYPE_WEB, MSQS_MESSAGE_TYPE_WEB, evf::utils::pid, csv2json::pieces, PIPE_READ, SiPixelLorentzAngle_cfi::read, stor::utils::sleep(), subs_, and superSleepSec_.

Referenced by FUEventProcessor().

{
  using namespace cgicc;
  pid_t pid = 0;
  std::ostringstream ost;
  ost << "&";

  Cgicc cgi(in);
  internal::MyCgi *mycgi = (internal::MyCgi*)in;
  for(std::map<std::string, std::string, std::less<std::string> >::iterator mit = 
        mycgi->getEnvironment().begin();
      mit != mycgi->getEnvironment().end(); mit++)
    ost << mit->first << "%" << mit->second << ";";
  std::vector<FormEntry> els = cgi.getElements() ;
  std::vector<FormEntry> el1;
  cgi.getElement("method",el1);
  std::vector<FormEntry> el2;
  cgi.getElement("process",el2);
  if(el1.size()!=0) {
    std::string meth = el1[0].getValue();
    if(el2.size()!=0) {
      unsigned int i = 0;
      std::string mod = el2[0].getValue();
      pid = atoi(mod.c_str()); // get the process id to be polled
      for(; i < subs_.size(); i++)
        if(subs_[i].pid()==pid) break;
      if(i>=subs_.size()){ //process was not found, let the browser know
        *out << "ERROR 404 : Process " << pid << " Not Found !" << std::endl;
        return;
      } 
      if(subs_[i].alive() != 1){
        *out << "ERROR 405 : Process " << pid << " Not Alive !" << std::endl;
        return;
      }
      MsgBuf msg1(meth.length()+ost.str().length()+1,MSQM_MESSAGE_TYPE_WEB);
      strncpy(msg1->mtext,meth.c_str(),meth.length());
      strncpy(msg1->mtext+meth.length(),ost.str().c_str(),ost.str().length());
      subs_[i].post(msg1,true);
      unsigned int keep_supersleep_original_value = superSleepSec_.value_;
      superSleepSec_.value_=10*keep_supersleep_original_value;
      MsgBuf msg(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_WEB);
      bool done = false;
      std::vector<char *>pieces;
      while(!done){
        unsigned long retval1 = subs_[i].rcvNonBlocking(msg,true);
        if(retval1 == MSGQ_MESSAGE_TYPE_RANGE){
	  ::sleep(1);
          continue;
        }
        unsigned int nbytes = atoi(msg->mtext);
        if(nbytes < MAX_PIPE_BUFFER_SIZE) done = true; // this will break the while loop
        char *buf= new char[nbytes];
        ssize_t retval = read(anonymousPipe_[PIPE_READ],buf,nbytes);
        if(retval!=nbytes) std::cout 
          << "CAREFUL HERE, read less bytes than expected from pipe in subWeb" << std::endl;
        pieces.push_back(buf);
      }
      superSleepSec_.value_=keep_supersleep_original_value;
      for(unsigned int j = 0; j < pieces.size(); j++){
        *out<<pieces[j];    // chain the buffers into the output strstream
        delete[] pieces[j]; //make sure to release all buffers used for reading the pipe
      }
    }
  }
}
bool FUEventProcessor::summarize ( toolbox::task::WorkLoop *  wl) [private]

Definition at line 1437 of file FUEventProcessor.cc.

References gather_cfg::cout, cpustat_, alignCSCRings::e, evf::TriggerReportStatic::eventSummary, evtProcessor_, evf::FWEPWrapper::fireScalersUpdate(), fsm_, evf::FWEPWrapper::getLumiSectionReferenceIndex(), evf::FWEPWrapper::getPackedTriggerReportAsStruct(), i, iDieStatisticsGathering_, evf::TriggerReportStatic::lumiSection, master_message_trr_, MSQS_MESSAGE_TYPE_TRR, evf::TriggerReportStatic::nbExpected, evf::TriggerReportStatic::nbReporting, nbSubProcesses_, nbSubProcessesReporting_, ratestat_, evf::CPUStat::reset(), evf::FWEPWrapper::resetPackedTriggerReport(), runTheMatrix::ret, evf::CPUStat::sendStat(), evf::RateStat::sendStat(), evf::CPUStat::setNproc(), stor::utils::sleep(), evf::StateMachine::stateName(), subs_, evf::FWEPWrapper::sumAndPackTriggerReport(), edm::EventSummary::totalEvents, evf::FWEPWrapper::updateRollingReport(), evf::FWEPWrapper::withdrawLumiSectionIncrement(), and wlScalersActive_.

Referenced by startSummarizeWorkLoop().

{
  evtProcessor_.resetPackedTriggerReport();
  bool atLeastOneProcessUpdatedSuccessfully = false;
  int msgCount = 0;
  for (unsigned int i = 0; i < subs_.size(); i++)
    {
      if(subs_[i].alive()>0)
        {
          int ret = 0;
          if(subs_[i].check_postponed_trigger_update(master_message_trr_,
                                                     evtProcessor_.getLumiSectionReferenceIndex()))
            {
              ret = MSQS_MESSAGE_TYPE_TRR;
              std::cout << "using postponed report from slot " << i << " for ls " << evtProcessor_.getLumiSectionReferenceIndex() << std::endl;
            }
          else{
            bool insync = false;
            bool exception_caught = false;
            while(!insync){
              try{
                ret = subs_[i].rcv(master_message_trr_,false);
              }
              catch(evf::Exception &e)
                {
                  std::cout << "exception in msgrcv on " << i 
                            << " " << subs_[i].alive() << " " << strerror(errno) << std::endl;
                  exception_caught = true;
                  break;
                  //do nothing special
                }
              if(ret==MSQS_MESSAGE_TYPE_TRR) {
                TriggerReportStatic *trp = (TriggerReportStatic *)master_message_trr_->mtext;
                if(trp->lumiSection >= evtProcessor_.getLumiSectionReferenceIndex()){
                  insync = true;
                }
              }
            }
            if(exception_caught) continue;
          }
          msgCount++;
          if(ret==MSQS_MESSAGE_TYPE_TRR) {
            TriggerReportStatic *trp = (TriggerReportStatic *)master_message_trr_->mtext;
            if(trp->lumiSection > evtProcessor_.getLumiSectionReferenceIndex()){
              std::cout << "postpone handling of msg from slot " << i << " with Ls " <<  trp->lumiSection
                        << " should be " << evtProcessor_.getLumiSectionReferenceIndex() << std::endl;
              subs_[i].add_postponed_trigger_update(master_message_trr_);
            }else{
              atLeastOneProcessUpdatedSuccessfully = true;
              evtProcessor_.sumAndPackTriggerReport(master_message_trr_);
            }
          }
          else std::cout << "msgrcv returned error " << errno << std::endl;
        }
    }
  if(atLeastOneProcessUpdatedSuccessfully){
    nbSubProcessesReporting_.value_ = msgCount;
    evtProcessor_.getPackedTriggerReportAsStruct()->nbExpected = nbSubProcesses_.value_;
    evtProcessor_.getPackedTriggerReportAsStruct()->nbReporting = nbSubProcessesReporting_.value_;
    evtProcessor_.updateRollingReport();
    evtProcessor_.fireScalersUpdate();
  }
  else{
    LOG4CPLUS_WARN(getApplicationLogger(),"Summarize loop: no process updated successfully - sleep 10 seconds before trying again");          
    if(msgCount==0) evtProcessor_.withdrawLumiSectionIncrement();
    nbSubProcessesReporting_.value_ = 0;
    ::sleep(10);
  }
  if(fsm_.stateName()->toString()!="Enabled"){
    wlScalersActive_ = false;
    return false;
  }
  //  cpustat_->printStat();
  if(iDieStatisticsGathering_.value_){
    try{
      TriggerReportStatic *trsp = evtProcessor_.getPackedTriggerReportAsStruct();
      cpustat_ ->setNproc(trsp->eventSummary.totalEvents);
      cpustat_ ->sendStat(evtProcessor_.getLumiSectionReferenceIndex());
      ratestat_->sendStat((unsigned char*)trsp,
                          sizeof(TriggerReportStatic),
                          evtProcessor_.getLumiSectionReferenceIndex());
    }catch(evf::Exception &e){
      LOG4CPLUS_INFO(getApplicationLogger(),"coud not send statistics"
                     << e.what());
    }
  }
  cpustat_->reset();
  return true;
}
bool FUEventProcessor::supervisor ( toolbox::task::WorkLoop *  wl) [private]

Definition at line 1087 of file FUEventProcessor.cc.

References evf::CPUStat::addEntry(), evf::FWEPWrapper::adjustLsIndexForRestart(), applicationInfoSpace_, autoRestartSlaves_, gather_cfg::cout, cpustat_, evf::StateMachine::disableRcmsStateNotification(), evf::prg::dqm, alignCSCRings::e, edm_init_done_, enableMPEPSlave(), evtProcessor_, exception, Exception, spr::find(), evf::StateMachine::fireEvent(), forkInEDM_, fsm_, i, localLog(), log_, evf::prg::ls, python::rootplot::utilities::ls(), master_message_prg_, master_message_prr_, evf::FWEPWrapper::moduleNameFromIndex(), monitorInfoSpace_, evf::prg::Ms, evf::prg::ms, MSQM_MESSAGE_TYPE_FSTOP, myProcess_, names_, nbAccepted, nbdead_, nblive_, nbProcessed, nbSubProcesses_, nbTotalDQM_, evf::FWEPWrapper::notstarted_state_code(), NUMERIC_MESSAGE_SIZE, AlCaHLTBitMon_ParallelJobs::p, pickup_lock_, evf::prg::ps, evf::FWEPWrapper::resetPackedTriggerReport(), restartForkInEDM(), findQualityFiles::rr, scalersUpdates_, edm::event_processor::sError, edm::event_processor::sInit, edm::event_processor::sInvalid, slaveRestartDelaySecs_, stor::utils::sleep(), spMStates_, spmStates_, edm::event_processor::sStopping, evf::StateMachine::stateName(), evf::FWEPWrapper::stateNameFromIndex(), stop_lock_, stopping(), subs_, superSleepSec_, and evf::prg::trp.

Referenced by startSupervisorLoop().

{
  pthread_mutex_lock(&stop_lock_);
  if(subs_.size()!=nbSubProcesses_.value_)
    {
      pthread_mutex_lock(&pickup_lock_);
      if(subs_.size()!=nbSubProcesses_.value_) {
        subs_.resize(nbSubProcesses_.value_);
        spMStates_.resize(nbSubProcesses_.value_);
        spmStates_.resize(nbSubProcesses_.value_);
        for(unsigned int i = 0; i < spMStates_.size(); i++)
          {
            spMStates_[i] = edm::event_processor::sInit; 
            spmStates_[i] = 0; 
          }
      }
      pthread_mutex_unlock(&pickup_lock_);
    }
  bool running = fsm_.stateName()->toString()=="Enabled";
  bool stopping = fsm_.stateName()->toString()=="stopping";
  for(unsigned int i = 0; i < subs_.size(); i++)
    {
      if(subs_[i].alive()==-1000) continue;
      int sl;
      pid_t sub_pid = subs_[i].pid();
      pid_t killedOrNot = waitpid(sub_pid,&sl,WNOHANG);

      if(killedOrNot && killedOrNot==sub_pid) {
        pthread_mutex_lock(&pickup_lock_);
        //check if out of range or recreated (enable can clear vector)
        if (i<subs_.size() && subs_[i].alive()!=-1000) {
          subs_[i].setStatus((WIFEXITED(sl) != 0 ? 0 : -1));
          std::ostringstream ost;
          if(subs_[i].alive()==0) ost << " process exited with status " << WEXITSTATUS(sl);
          else if(WIFSIGNALED(sl)!=0) ost << " process terminated with signal " << WTERMSIG(sl);
          else ost << " process stopped ";
          subs_[i].countdown()=slaveRestartDelaySecs_.value_;
          subs_[i].setReasonForFailed(ost.str());
          spMStates_[i] = evtProcessor_.notstarted_state_code();
          spmStates_[i] = 0;
          std::ostringstream ost1;
          ost1 << "-E- Slave " << subs_[i].pid() << ost.str();
          localLog(ost1.str());
          if(!autoRestartSlaves_.value_) subs_[i].disconnect();
        }
        pthread_mutex_unlock(&pickup_lock_);
      }
    }
  pthread_mutex_unlock(&stop_lock_);    
  if(stopping) return true; // if in stopping we are done

  if(running && edm_init_done_)
    {
      // if enabled, this loop will periodically check if dead slaves countdown has expired and restart them
      // this is only active while running, hence, the stop lock is acquired and only released at end of loop
      if(autoRestartSlaves_.value_){
        pthread_mutex_lock(&stop_lock_); //lockout slave killing at stop while you check for restarts
        for(unsigned int i = 0; i < subs_.size(); i++)
          {
            if(subs_[i].alive() != 1){
              if(subs_[i].countdown() == 0)
                {
                  if(subs_[i].restartCount()>2){
                    LOG4CPLUS_WARN(getApplicationLogger()," Not restarting subprocess in slot " << i 
                                   << " - maximum restart count reached");
                    std::ostringstream ost1;
                    ost1 << "-W- Dead Process in slot " << i << " reached maximum restart count"; 
                    localLog(ost1.str());
                    subs_[i].countdown()--;
                    XCEPT_DECLARE(evf::Exception,
                                  sentinelException, ost1.str());
                    notifyQualified("error",sentinelException);
                    subs_[i].disconnect();
                    continue;
                  }
                  subs_[i].restartCount()++;
                  if (forkInEDM_.value_) {
                    restartForkInEDM(i);
                  }
                  else {
                    pid_t rr = subs_[i].forkNew();
                    if(rr==0)
                    {
                      myProcess_=&subs_[i];
                      scalersUpdates_ = 0;
                      int retval = pthread_mutex_destroy(&stop_lock_);
                      if(retval != 0) perror("error");
                      retval = pthread_mutex_init(&stop_lock_,0);
                      if(retval != 0) perror("error");
                      fsm_.disableRcmsStateNotification();
                      fsm_.fireEvent("Stop",this); // need to set state in fsm first to allow stopDone transition
                      fsm_.fireEvent("StopDone",this); // need to set state in fsm first to allow stopDone transition
                      fsm_.fireEvent("Enable",this); // need to set state in fsm first to allow stopDone transition
                      try{
                        xdata::Serializable *lsid = applicationInfoSpace_->find("lumiSectionIndex");
                        if(lsid) {
                          ((xdata::UnsignedInteger32*)(lsid))->value_--; // need to reset to value before end of ls in which process died
                        }
                      }
                      catch(...){
                        std::cout << "trouble with lsindex during restart" << std::endl;
                      }
                      try{
                        xdata::Serializable *lstb = applicationInfoSpace_->find("lsToBeRecovered");
                        if(lstb) {
                          ((xdata::Boolean*)(lstb))->value_ = false; // do not issue eol/bol for all Ls when restarting
                        }
                      }
                      catch(...){
                        std::cout << "trouble with resetting flag for eol recovery " << std::endl;
                      }

                      evtProcessor_.adjustLsIndexForRestart();
                      evtProcessor_.resetPackedTriggerReport();
                      enableMPEPSlave();
                      return false; // exit the supervisor loop immediately in the child !!!
                    }
                  else
                    {
                      std::ostringstream ost1;
                      ost1 << "-I- New Process " << rr << " forked for slot " << i; 
                      localLog(ost1.str());
                    }
                  }
                }
              if(subs_[i].countdown()>=0) subs_[i].countdown()--;
            }
          }
        pthread_mutex_unlock(&stop_lock_);
      } // finished handling replacement of dead slaves once they've been reaped
    }
  xdata::Serializable *lsid = 0; 
  xdata::Serializable *psid = 0;
  xdata::Serializable *dqmp = 0;
  xdata::UnsignedInteger32 *dqm = 0;


  
  if(running && edm_init_done_){  
    try{
      lsid = applicationInfoSpace_->find("lumiSectionIndex");
      psid = applicationInfoSpace_->find("prescaleSetIndex");
      nbProcessed = monitorInfoSpace_->find("nbProcessed");
      nbAccepted  = monitorInfoSpace_->find("nbAccepted");
      dqmp = applicationInfoSpace_-> find("nbDqmUpdates");      
    }
    catch(xdata::exception::Exception e){
      LOG4CPLUS_INFO(getApplicationLogger(),"could not retrieve some data - " << e.what());    
    }

    try{
      if(nbProcessed !=0 && nbAccepted !=0)
        {
          xdata::UnsignedInteger32*nbp = ((xdata::UnsignedInteger32*)nbProcessed);
          xdata::UnsignedInteger32*nba = ((xdata::UnsignedInteger32*)nbAccepted);
          xdata::UnsignedInteger32*ls  = ((xdata::UnsignedInteger32*)lsid);
          xdata::UnsignedInteger32*ps  = ((xdata::UnsignedInteger32*)psid);
          if(dqmp!=0)
            dqm = (xdata::UnsignedInteger32*)dqmp;
          if(dqm) dqm->value_ = 0;
          nbTotalDQM_ = 0;
          nbp->value_ = 0;
          nba->value_ = 0;
          nblive_ = 0;
          nbdead_ = 0;
          scalersUpdates_ = 0;

          for(unsigned int i = 0; i < subs_.size(); i++)
            {
              if(subs_[i].alive()>0)
                {
                  nblive_++;
                  try{
                    subs_[i].post(master_message_prg_,true);
                    
                    unsigned long retval = subs_[i].rcvNonBlocking(master_message_prr_,true);
                    if(retval == (unsigned long) master_message_prr_->mtype){
                      prg* p = (struct prg*)(master_message_prr_->mtext);
                      subs_[i].setParams(p);
                      spMStates_[i] = p->Ms;
                      spmStates_[i] = p->ms;
                      cpustat_->addEntry(p->ms);
                      if(!subs_[i].inInconsistentState() && 
                         (p->Ms == edm::event_processor::sError 
                          || p->Ms == edm::event_processor::sInvalid
                          || p->Ms == edm::event_processor::sStopping))
                        {
                          std::ostringstream ost;
                          ost << "edm::eventprocessor slot " << i << " process id " 
                              << subs_[i].pid() << " not in Running state : Mstate=" 
                              << evtProcessor_.stateNameFromIndex(p->Ms) << " mstate="
                              << evtProcessor_.moduleNameFromIndex(p->ms) 
                              << " - Look into possible error messages from HLT process";
                          LOG4CPLUS_WARN(getApplicationLogger(),ost.str());
                        }
                      nbp->value_ += subs_[i].params().nbp;
                      nba->value_  += subs_[i].params().nba;
                      if(dqm)dqm->value_ += p->dqm;
                      nbTotalDQM_ +=  p->dqm;
                      scalersUpdates_ += p->trp;
                      if(p->ls > ls->value_) ls->value_ = p->ls;
                      if(p->ps != ps->value_) ps->value_ = p->ps;
                    }
                    else{
                      nbp->value_ += subs_[i].get_save_nbp();
                      nba->value_ += subs_[i].get_save_nba();
                    }
                  } 
                  catch(evf::Exception &e){
                    LOG4CPLUS_INFO(getApplicationLogger(),
                                   "could not send/receive msg on slot " 
                                   << i << " - " << e.what());    
                  }
                    
                }
              else
                {
                  nbp->value_ += subs_[i].get_save_nbp();
                  nba->value_ += subs_[i].get_save_nba();
                  nbdead_++;
                }
            }
          if(nbp->value_>64){//have some slaves already processed more than one event ? (eventually make this == number of raw cells)
            for(unsigned int i = 0; i < subs_.size(); i++)
              {
                if(subs_[i].params().nbp == 0){ // a slave has processed 0 events 
                  // check that the process is not stuck
                  if(subs_[i].alive()>0 && subs_[i].params().ms == 0) // the process is seen alive but in us=Invalid(0)
                    {
                      subs_[i].found_invalid();//increase the "found_invalid" counter
                      if(subs_[i].nfound_invalid() > 60){ //wait x monitor cycles (~1 min a good time ?) before doing something about a stuck slave
                        MsgBuf msg3(NUMERIC_MESSAGE_SIZE,MSQM_MESSAGE_TYPE_FSTOP);      // send a force-stop signal             
                        subs_[i].post(msg3,false);
                        std::ostringstream ost1;
                        ost1 << "-W- Process in slot " << i << " Never reached the running state - forcestopping it"; 
                        localLog(ost1.str());
                        LOG4CPLUS_ERROR(getApplicationLogger(),ost1.str());    
                        XCEPT_DECLARE(evf::Exception,
                                      sentinelException, ost1.str());
                        notifyQualified("error",sentinelException);

                      }
                    }
                }
              }
          }
        }
    }
    catch(std::exception &e){
      LOG4CPLUS_INFO(getApplicationLogger(),"std exception - " << e.what());    
    }
    catch(...){
      LOG4CPLUS_INFO(getApplicationLogger(),"unknown exception ");    
    }
  }
  else{
    for(unsigned int i = 0; i < subs_.size(); i++)
      {
        if(subs_[i].alive()==-1000)
          {
            spMStates_[i] = edm::event_processor::sInit;
            spmStates_[i] = 0;
          }
      }
  }
  try{
    monitorInfoSpace_->lock();
    monitorInfoSpace_->fireItemGroupChanged(names_,0);
    monitorInfoSpace_->unlock();
  }
  catch(xdata::exception::Exception &e)
    {
      LOG4CPLUS_ERROR(log_, "Exception from fireItemGroupChanged: " << e.what());
      //        localLog(e.what());
    }
  ::sleep(superSleepSec_.value_);       
  return true;
}
void FUEventProcessor::updater ( xgi::Input *  in,
xgi::Output *  out 
)

Definition at line 2208 of file FUEventProcessor.cc.

References evf::lsTriplet::acc, evf::utils::cDiv(), configString_, cpustat_, evtProcessor_, fsm_, evf::CPUStat::getChart(), evf::Vulture::hasStarted(), i, iDieUrl_, evf::FWEPWrapper::lastLumi(), logRing_, logRingIndex_, logRingSize_, logWrap_, evf::lsTriplet::ls, evf::utils::mDiv(), myProcess_, nbAccepted, nbProcessed, nbSubProcesses_, nbSubProcessesReporting_, evf::lsTriplet::proc, runNumber_, squidPresent_, evf::StateMachine::stateName(), supervising_, updaterStatic_, evf::utils::uptime(), vp_, vulture_, evf::FWEPWrapper::wlMonitoring(), wlScalersActive_, and wlSummarizeActive_.

Referenced by FUEventProcessor().

{
  using namespace utils;

  *out << updaterStatic_;
  mDiv(out,"loads");
  uptime(out);
  cDiv(out);
  mDiv(out,"st",fsm_.stateName()->toString());
  mDiv(out,"ru",runNumber_.toString());
  mDiv(out,"nsl",nbSubProcesses_.value_);
  mDiv(out,"nsr",nbSubProcessesReporting_.value_);
  mDiv(out,"cl");
  *out << getApplicationDescriptor()->getClassName() 
       << (nbSubProcesses_.value_ > 0 ? "MP " : " ");
  cDiv(out);
  mDiv(out,"in",getApplicationDescriptor()->getInstance());
  if(fsm_.stateName()->toString() != "Halted" && fsm_.stateName()->toString() != "halting"){
    mDiv(out,"hlt");
    *out << "<a href=\"" << configString_.toString() << "\">HLT Config</a>";
    cDiv(out);
    *out << std::endl;
  }
  else
    mDiv(out,"hlt","Not yet...");

  mDiv(out,"sq",squidPresent_.toString());
  mDiv(out,"vwl",(supervising_ ? "Active" : "Not Initialized"));
  mDiv(out,"mwl",evtProcessor_.wlMonitoring());
  if(nbProcessed != 0 && nbAccepted != 0)
    {
      mDiv(out,"tt",((xdata::UnsignedInteger32*)nbProcessed)->value_);
      mDiv(out,"ac",((xdata::UnsignedInteger32*)nbAccepted)->value_);
    }
  else
    {
      mDiv(out,"tt",0);
      mDiv(out,"ac",0);
    }
  if(!myProcess_)
    mDiv(out,"swl",(wlSummarizeActive_ ? "Active" : "Inactive"));
  else
    mDiv(out,"swl",(wlScalersActive_ ? "Active" : "Inactive"));

  mDiv(out,"idi",iDieUrl_.value_);
  if(vp_!=0){
    mDiv(out,"vpi",(unsigned int) vp_);
    if(vulture_->hasStarted()>=0)
      mDiv(out,"vul","Prowling");
    else
      mDiv(out,"vul","Dead");
  }
  else{
    mDiv(out,"vul",(vulture_==0 ? "Nope" : "Hatching"));
  }    
  if(evtProcessor_){
    mDiv(out,"ll");
    *out << evtProcessor_.lastLumi().ls
         << "," << evtProcessor_.lastLumi().proc << "," << evtProcessor_.lastLumi().acc;
    cDiv(out);
  }
  mDiv(out,"lg");
  for(unsigned int i = logRingIndex_; i<logRingSize_; i++)
    *out << logRing_[i] << std::endl;
  if(logWrap_)
    for(unsigned int i = 0; i<logRingIndex_; i++)
      *out << logRing_[i] << std::endl;
  cDiv(out);
  mDiv(out,"cha");
  if(cpustat_) *out << cpustat_->getChart();
  cDiv(out);
}
evf::FUEventProcessor::XDAQ_INSTANTIATOR ( )

Member Data Documentation

Definition at line 258 of file FUEventProcessor.h.

Referenced by FUEventProcessor(), receivingAndMonitor(), and subWeb().

xdata::InfoSpace* evf::FUEventProcessor::applicationInfoSpace_ [private]
toolbox::task::ActionSignature* evf::FUEventProcessor::asReceiveMsgAndExecute_ [private]

Definition at line 224 of file FUEventProcessor.h.

Referenced by startReceivingLoop().

toolbox::task::ActionSignature* evf::FUEventProcessor::asReceiveMsgAndRead_ [private]

Definition at line 227 of file FUEventProcessor.h.

Referenced by startReceivingMonitorLoop().

toolbox::task::ActionSignature* evf::FUEventProcessor::asScalers_ [private]

Definition at line 250 of file FUEventProcessor.h.

Referenced by startScalersWorkLoop().

toolbox::task::ActionSignature* evf::FUEventProcessor::asSummarize_ [private]

Definition at line 256 of file FUEventProcessor.h.

Referenced by startSummarizeWorkLoop().

toolbox::task::ActionSignature* evf::FUEventProcessor::asSupervisor_ [private]

Definition at line 231 of file FUEventProcessor.h.

Referenced by startSupervisorLoop().

Definition at line 180 of file FUEventProcessor.h.

Referenced by FUEventProcessor(), microState(), and supervisor().

xdata::String evf::FUEventProcessor::class_ [private]

Definition at line 171 of file FUEventProcessor.h.

Referenced by FUEventProcessor().

xdata::String evf::FUEventProcessor::configString_ [private]

Definition at line 175 of file FUEventProcessor.h.

Referenced by configuring(), enabling(), FUEventProcessor(), and updater().

Definition at line 176 of file FUEventProcessor.h.

Referenced by configuring(), enabling(), and spotlightWebPage().

Definition at line 266 of file FUEventProcessor.h.

Referenced by configuring(), enabling(), summarize(), supervisor(), and updater().

Definition at line 202 of file FUEventProcessor.h.

Referenced by css().

Definition at line 279 of file FUEventProcessor.h.

Referenced by doEndRunInEDM(), enabling(), forkProcessesFromEDM(), and supervisor().

xdata::Boolean evf::FUEventProcessor::epInitialized_ [private]

Definition at line 174 of file FUEventProcessor.h.

Referenced by actionPerformed(), configuring(), enabling(), and FUEventProcessor().

xdata::Boolean evf::FUEventProcessor::exitOnError_ [private]

Definition at line 199 of file FUEventProcessor.h.

Referenced by receivingAndMonitor().

xdata::UnsignedInteger32 evf::FUEventProcessor::forkInEDM_ [private]

Definition at line 215 of file FUEventProcessor.h.

Referenced by enabling(), FUEventProcessor(), halting(), stopping(), and supervisor().

pthread_mutex_t evf::FUEventProcessor::forkObjLock_ [private]

Definition at line 278 of file FUEventProcessor.h.

Referenced by enableForkInEDM(), and FUEventProcessor().

Definition at line 185 of file FUEventProcessor.h.

Referenced by configuring(), enabling(), FUEventProcessor(), and makeStaticInfo().

Definition at line 184 of file FUEventProcessor.h.

Referenced by configuring(), enabling(), and FUEventProcessor().

Definition at line 186 of file FUEventProcessor.h.

Referenced by configuring(), enabling(), FUEventProcessor(), and makeStaticInfo().

xdata::Boolean evf::FUEventProcessor::hasShMem_ [private]

Definition at line 188 of file FUEventProcessor.h.

Referenced by configuring(), enabling(), FUEventProcessor(), and summarize().

xdata::String evf::FUEventProcessor::iDieUrl_ [private]

Definition at line 263 of file FUEventProcessor.h.

Referenced by configuring(), enabling(), FUEventProcessor(), and updater().

xdata::UnsignedInteger32 evf::FUEventProcessor::instance_ [private]

Definition at line 172 of file FUEventProcessor.h.

Referenced by configuring(), enabling(), and FUEventProcessor().

Definition at line 187 of file FUEventProcessor.h.

Referenced by enableCommon(), enableForkInEDM(), and FUEventProcessor().

Logger evf::FUEventProcessor::log_ [private]

Definition at line 164 of file FUEventProcessor.h.

Referenced by doEndRunInEDM(), enableForkInEDM(), restartForkInEDM(), and supervisor().

std::vector<std::string> evf::FUEventProcessor::logRing_ [private]

Definition at line 208 of file FUEventProcessor.h.

Referenced by localLog(), logsAsString(), and updater().

unsigned int evf::FUEventProcessor::logRingIndex_ [private]

Definition at line 209 of file FUEventProcessor.h.

Referenced by localLog(), logsAsString(), and updater().

const unsigned int evf::FUEventProcessor::logRingSize_ = 50 [static, private]

Definition at line 210 of file FUEventProcessor.h.

Referenced by localLog(), and updater().

Definition at line 211 of file FUEventProcessor.h.

Referenced by localLog(), logsAsString(), and updater().

Definition at line 271 of file FUEventProcessor.h.

Referenced by supervisor().

Definition at line 272 of file FUEventProcessor.h.

Referenced by supervisor().

Definition at line 275 of file FUEventProcessor.h.

Referenced by summarize().

xdata::InfoSpace* evf::FUEventProcessor::monitorInfoSpace_ [private]

Definition at line 234 of file FUEventProcessor.h.

Referenced by FUEventProcessor(), and supervisor().

Definition at line 235 of file FUEventProcessor.h.

Referenced by FUEventProcessor().

Definition at line 269 of file FUEventProcessor.h.

Referenced by configuring(), enableForkInEDM(), and enabling().

std::list<std::string> evf::FUEventProcessor::names_ [private]

Definition at line 262 of file FUEventProcessor.h.

Referenced by FUEventProcessor(), and supervisor().

xdata::Serializable* evf::FUEventProcessor::nbAccepted [private]

Definition at line 242 of file FUEventProcessor.h.

Referenced by supervisor(), and updater().

unsigned int evf::FUEventProcessor::nbdead_ [private]

Definition at line 218 of file FUEventProcessor.h.

Referenced by microState(), and supervisor().

unsigned int evf::FUEventProcessor::nblive_ [private]

Definition at line 217 of file FUEventProcessor.h.

Referenced by microState(), and supervisor().

xdata::Serializable* evf::FUEventProcessor::nbProcessed [private]

Definition at line 241 of file FUEventProcessor.h.

Referenced by supervisor(), and updater().

xdata::UnsignedInteger32 evf::FUEventProcessor::nbSubProcesses_ [private]
xdata::UnsignedInteger32 evf::FUEventProcessor::nbSubProcessesReporting_ [private]

Definition at line 214 of file FUEventProcessor.h.

Referenced by FUEventProcessor(), summarize(), and updater().

unsigned int evf::FUEventProcessor::nbTotalDQM_ [private]

Definition at line 220 of file FUEventProcessor.h.

Referenced by enabling(), microState(), and supervisor().

Definition at line 189 of file FUEventProcessor.h.

Referenced by actionPerformed().

xdata::Boolean evf::FUEventProcessor::outPut_ [private]

Definition at line 178 of file FUEventProcessor.h.

Referenced by actionPerformed(), FUEventProcessor(), and makeStaticInfo().

pthread_mutex_t evf::FUEventProcessor::pickup_lock_ [private]

Definition at line 239 of file FUEventProcessor.h.

Referenced by enabling(), FUEventProcessor(), microState(), and supervisor().

Definition at line 267 of file FUEventProcessor.h.

Referenced by configuring(), enabling(), and summarize().

Definition at line 225 of file FUEventProcessor.h.

Referenced by startReceivingLoop().

Definition at line 228 of file FUEventProcessor.h.

Referenced by startReceivingMonitorLoop().

xdata::UnsignedInteger32 evf::FUEventProcessor::runNumber_ [private]
xdata::InfoSpace* evf::FUEventProcessor::scalersInfoSpace_ [private]

Definition at line 245 of file FUEventProcessor.h.

Referenced by FUEventProcessor().

Definition at line 246 of file FUEventProcessor.h.

Referenced by configuring(), enabling(), FUEventProcessor(), and pathNames().

Definition at line 274 of file FUEventProcessor.h.

Referenced by receivingAndMonitor().

Definition at line 273 of file FUEventProcessor.h.

Referenced by receivingAndMonitor().

xdata::UnsignedInteger32 evf::FUEventProcessor::slaveRestartDelaySecs_ [private]

Definition at line 181 of file FUEventProcessor.h.

Referenced by FUEventProcessor(), and supervisor().

Definition at line 270 of file FUEventProcessor.h.

Referenced by configuring(), enabling(), and forkProcessesFromEDM().

std::string evf::FUEventProcessor::sourceId_ [private]

Definition at line 192 of file FUEventProcessor.h.

Referenced by FUEventProcessor().

Definition at line 260 of file FUEventProcessor.h.

Referenced by configuring(), FUEventProcessor(), and supervisor().

Definition at line 259 of file FUEventProcessor.h.

Referenced by configuring(), FUEventProcessor(), and supervisor().

Definition at line 207 of file FUEventProcessor.h.

Referenced by FUEventProcessor().

xdata::Boolean evf::FUEventProcessor::squidPresent_ [private]

Definition at line 195 of file FUEventProcessor.h.

Referenced by FUEventProcessor(), and updater().

pthread_mutex_t evf::FUEventProcessor::start_lock_ [private]

Definition at line 238 of file FUEventProcessor.h.

Referenced by enabling(), FUEventProcessor(), and microState().

pthread_mutex_t evf::FUEventProcessor::stop_lock_ [private]
std::vector<SubProcess> evf::FUEventProcessor::subs_ [private]
xdata::UnsignedInteger32 evf::FUEventProcessor::superSleepSec_ [private]

Definition at line 261 of file FUEventProcessor.h.

Referenced by FUEventProcessor(), subWeb(), and supervisor().

Definition at line 232 of file FUEventProcessor.h.

Referenced by startSupervisorLoop(), and updater().

Definition at line 240 of file FUEventProcessor.h.

Referenced by makeStaticInfo(), and updater().

xdata::String evf::FUEventProcessor::url_ [private]

Definition at line 170 of file FUEventProcessor.h.

Referenced by FUEventProcessor().

pid_t evf::FUEventProcessor::vp_ [private]

Definition at line 265 of file FUEventProcessor.h.

Referenced by configuring(), enabling(), and updater().

toolbox::task::WorkLoop* evf::FUEventProcessor::wlReceiving_ [private]

Definition at line 223 of file FUEventProcessor.h.

Referenced by startReceivingLoop().

toolbox::task::WorkLoop* evf::FUEventProcessor::wlReceivingMonitor_ [private]

Definition at line 226 of file FUEventProcessor.h.

Referenced by startReceivingMonitorLoop().

toolbox::task::WorkLoop* evf::FUEventProcessor::wlScalers_ [private]

Definition at line 249 of file FUEventProcessor.h.

Referenced by startScalersWorkLoop().

Definition at line 251 of file FUEventProcessor.h.

Referenced by scalers(), startScalersWorkLoop(), summarize(), and updater().

toolbox::task::WorkLoop* evf::FUEventProcessor::wlSummarize_ [private]

Definition at line 255 of file FUEventProcessor.h.

Referenced by startSummarizeWorkLoop().

Definition at line 257 of file FUEventProcessor.h.

Referenced by startSummarizeWorkLoop(), and updater().

toolbox::task::WorkLoop* evf::FUEventProcessor::wlSupervising_ [private]

Definition at line 230 of file FUEventProcessor.h.

Referenced by startSupervisorLoop().