00001
00002
00003
00004
00005
00007
00008 #include "FUEventProcessor.h"
00009 #include "procUtils.h"
00010 #include "EventFilter/Utilities/interface/CPUStat.h"
00011 #include "EventFilter/Utilities/interface/RateStat.h"
00012
00013 #include "EventFilter/Utilities/interface/Exception.h"
00014
00015 #include "EventFilter/Message2log4cplus/interface/MLlog4cplus.h"
00016 #include "EventFilter/Modules/interface/FUShmDQMOutputService.h"
00017 #include "EventFilter/Utilities/interface/ServiceWebRegistry.h"
00018 #include "EventFilter/Utilities/interface/ServiceWeb.h"
00019 #include "EventFilter/Utilities/interface/ModuleWeb.h"
00020 #include "EventFilter/Utilities/interface/ModuleWebRegistry.h"
00021 #include "EventFilter/Modules/src/FUShmOutputModule.h"
00022
00023 #include "FWCore/PluginManager/interface/ProblemTracker.h"
00024 #include "FWCore/PluginManager/interface/PresenceFactory.h"
00025 #include "FWCore/Utilities/interface/Presence.h"
00026 #include "FWCore/Utilities/interface/Exception.h"
00027 #include "FWCore/Version/interface/GetReleaseVersion.h"
00028 #include "FWCore/ServiceRegistry/interface/Service.h"
00029
00030 #include "toolbox/BSem.h"
00031
00032 #include <boost/tokenizer.hpp>
00033
00034 #include "xcept/tools.h"
00035 #include "xgi/Method.h"
00036
00037 #include "cgicc/CgiDefs.h"
00038 #include "cgicc/Cgicc.h"
00039 #include "cgicc/FormEntry.h"
00040
00041
00042 #include <sys/wait.h>
00043 #include <sys/utsname.h>
00044
00045 #include <typeinfo>
00046 #include <stdlib.h>
00047 #include <stdio.h>
00048 #include <errno.h>
00049
00050 using namespace evf;
00051 using namespace cgicc;
00052 namespace toolbox {
00053 namespace mem {
00054 extern toolbox::BSem * _s_mutex_ptr_;
00055 }
00056 }
00057
00059
00061
00062
00063 FUEventProcessor::FUEventProcessor(xdaq::ApplicationStub *s)
00064 : xdaq::Application(s)
00065 , fsm_(this)
00066 , log_(getApplicationLogger())
00067 , evtProcessor_(log_, getApplicationDescriptor()->getInstance())
00068 , runNumber_(0)
00069 , epInitialized_(false)
00070 , outPut_(true)
00071 , autoRestartSlaves_(false)
00072 , slaveRestartDelaySecs_(10)
00073 , hasShMem_(true)
00074 , hasPrescaleService_(true)
00075 , hasModuleWebRegistry_(true)
00076 , hasServiceWebRegistry_(true)
00077 , isRunNumberSetter_(true)
00078 , iDieStatisticsGathering_(false)
00079 , outprev_(true)
00080 , exitOnError_(true)
00081 , reasonForFailedState_()
00082 , squidnet_(3128,"https://localhost:8000/RELEASE-NOTES.txt")
00083 , logRing_(logRingSize_)
00084 , logRingIndex_(logRingSize_)
00085 , logWrap_(false)
00086 , nbSubProcesses_(0)
00087 , nbSubProcessesReporting_(0)
00088 , forkInEDM_(true)
00089 , nblive_(0)
00090 , nbdead_(0)
00091 , nbTotalDQM_(0)
00092 , wlReceiving_(0)
00093 , asReceiveMsgAndExecute_(0)
00094 , receiving_(false)
00095 , wlReceivingMonitor_(0)
00096 , asReceiveMsgAndRead_(0)
00097 , receivingM_(false)
00098 , myProcess_(0)
00099 , wlSupervising_(0)
00100 , asSupervisor_(0)
00101 , supervising_(false)
00102 , monitorInfoSpace_(0)
00103 , monitorLegendaInfoSpace_(0)
00104 , applicationInfoSpace_(0)
00105 , nbProcessed(0)
00106 , nbAccepted(0)
00107 , scalersInfoSpace_(0)
00108 , scalersLegendaInfoSpace_(0)
00109 , wlScalers_(0)
00110 , asScalers_(0)
00111 , wlScalersActive_(false)
00112 , scalersUpdates_(0)
00113 , wlSummarize_(0)
00114 , asSummarize_(0)
00115 , wlSummarizeActive_(false)
00116 , superSleepSec_(1)
00117 , iDieUrl_("none")
00118 , vulture_(0)
00119 , vp_(0)
00120 , cpustat_(0)
00121 , ratestat_(0)
00122 , mwrRef_(nullptr)
00123 , sorRef_(nullptr)
00124 , master_message_prg_(0,MSQM_MESSAGE_TYPE_PRG)
00125 , master_message_prr_(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_PRR)
00126 , slave_message_prr_(sizeof(prg),MSQS_MESSAGE_TYPE_PRR)
00127 , master_message_trr_(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_TRR)
00128 , edm_init_done_(true)
00129 {
00130 using namespace utils;
00131
00132 names_.push_back("nbProcessed" );
00133 names_.push_back("nbAccepted" );
00134 names_.push_back("epMacroStateInt");
00135 names_.push_back("epMicroStateInt");
00136
00137 int retpipe = pipe(anonymousPipe_);
00138 if(retpipe != 0)
00139 LOG4CPLUS_ERROR(getApplicationLogger(),"Failed to create pipe");
00140
00141 squidPresent_ = squidnet_.check();
00142
00143 evtProcessor_.setAppDesc(getApplicationDescriptor());
00144 evtProcessor_.setAppCtxt(getApplicationContext());
00145
00146 fsm_.initialize<evf::FUEventProcessor>(this);
00147
00148
00149 url_ =
00150 getApplicationDescriptor()->getContextDescriptor()->getURL()+"/"+
00151 getApplicationDescriptor()->getURN();
00152 class_ =getApplicationDescriptor()->getClassName();
00153 instance_=getApplicationDescriptor()->getInstance();
00154 sourceId_=class_.toString()+"_"+instance_.toString();
00155 LOG4CPLUS_INFO(getApplicationLogger(),sourceId_ <<" constructor" );
00156 LOG4CPLUS_INFO(getApplicationLogger(),"CMSSW_BASE:"<<getenv("CMSSW_BASE"));
00157
00158 getApplicationDescriptor()->setAttribute("icon", "/evf/images/epicon.jpg");
00159
00160 xdata::InfoSpace *ispace = getApplicationInfoSpace();
00161 applicationInfoSpace_ = ispace;
00162
00163
00164 ispace->fireItemAvailable("parameterSet", &configString_ );
00165 ispace->fireItemAvailable("epInitialized", &epInitialized_ );
00166 ispace->fireItemAvailable("stateName", fsm_.stateName() );
00167 ispace->fireItemAvailable("runNumber", &runNumber_ );
00168 ispace->fireItemAvailable("outputEnabled", &outPut_ );
00169
00170 ispace->fireItemAvailable("hasSharedMemory", &hasShMem_);
00171 ispace->fireItemAvailable("hasPrescaleService", &hasPrescaleService_ );
00172 ispace->fireItemAvailable("hasModuleWebRegistry", &hasModuleWebRegistry_ );
00173 ispace->fireItemAvailable("hasServiceWebRegistry", &hasServiceWebRegistry_ );
00174 ispace->fireItemAvailable("isRunNumberSetter", &isRunNumberSetter_ );
00175 ispace->fireItemAvailable("iDieStatisticsGathering", &iDieStatisticsGathering_);
00176 ispace->fireItemAvailable("rcmsStateListener", fsm_.rcmsStateListener() );
00177 ispace->fireItemAvailable("foundRcmsStateListener",fsm_.foundRcmsStateListener());
00178 ispace->fireItemAvailable("nbSubProcesses", &nbSubProcesses_ );
00179 ispace->fireItemAvailable("nbSubProcessesReporting",&nbSubProcessesReporting_ );
00180 ispace->fireItemAvailable("forkInEDM" ,&forkInEDM_ );
00181 ispace->fireItemAvailable("superSleepSec", &superSleepSec_ );
00182 ispace->fireItemAvailable("autoRestartSlaves", &autoRestartSlaves_ );
00183 ispace->fireItemAvailable("slaveRestartDelaySecs",&slaveRestartDelaySecs_ );
00184 ispace->fireItemAvailable("iDieUrl", &iDieUrl_ );
00185
00186
00187 getApplicationInfoSpace()->addItemChangedListener("parameterSet", this);
00188 getApplicationInfoSpace()->addItemChangedListener("outputEnabled", this);
00189
00190
00191 fsm_.findRcmsStateListener();
00192
00193
00194
00195 std::string monInfoSpaceName="evf-eventprocessor-status-monitor";
00196 toolbox::net::URN urn = this->createQualifiedInfoSpace(monInfoSpaceName);
00197 monitorInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
00198
00199 std::string monLegendaInfoSpaceName="evf-eventprocessor-status-legenda";
00200 urn = this->createQualifiedInfoSpace(monLegendaInfoSpaceName);
00201 monitorLegendaInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
00202
00203
00204 monitorInfoSpace_->fireItemAvailable("url", &url_ );
00205 monitorInfoSpace_->fireItemAvailable("class", &class_ );
00206 monitorInfoSpace_->fireItemAvailable("instance", &instance_ );
00207 monitorInfoSpace_->fireItemAvailable("runNumber", &runNumber_ );
00208 monitorInfoSpace_->fireItemAvailable("stateName", fsm_.stateName());
00209
00210 monitorInfoSpace_->fireItemAvailable("squidPresent", &squidPresent_ );
00211
00212 std::string scalersInfoSpaceName="evf-eventprocessor-scalers-monitor";
00213 urn = this->createQualifiedInfoSpace(scalersInfoSpaceName);
00214 scalersInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
00215
00216 std::string scalersLegendaInfoSpaceName="evf-eventprocessor-scalers-legenda";
00217 urn = this->createQualifiedInfoSpace(scalersLegendaInfoSpaceName);
00218 scalersLegendaInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
00219
00220
00221
00222 evtProcessor_.setScalersInfoSpace(scalersInfoSpace_,scalersLegendaInfoSpace_);
00223 scalersInfoSpace_->fireItemAvailable("instance", &instance_);
00224
00225 evtProcessor_.setApplicationInfoSpace(ispace);
00226 evtProcessor_.setMonitorInfoSpace(monitorInfoSpace_,monitorLegendaInfoSpace_);
00227 evtProcessor_.publishConfigAndMonitorItems(nbSubProcesses_.value_!=0);
00228
00229
00230 monitorInfoSpace_->fireItemAvailable("epMacroStateInt", &spMStates_);
00231 monitorInfoSpace_->fireItemAvailable("epMicroStateInt", &spmStates_);
00232
00233
00234 xgi::bind(this, &FUEventProcessor::css, "styles.css");
00235 xgi::bind(this, &FUEventProcessor::defaultWebPage, "Default" );
00236 xgi::bind(this, &FUEventProcessor::spotlightWebPage, "Spotlight" );
00237 xgi::bind(this, &FUEventProcessor::scalersWeb, "scalersWeb");
00238 xgi::bind(this, &FUEventProcessor::pathNames, "pathNames" );
00239 xgi::bind(this, &FUEventProcessor::subWeb, "SubWeb" );
00240 xgi::bind(this, &FUEventProcessor::moduleWeb, "moduleWeb" );
00241 xgi::bind(this, &FUEventProcessor::serviceWeb, "serviceWeb");
00242 xgi::bind(this, &FUEventProcessor::microState, "microState");
00243 xgi::bind(this, &FUEventProcessor::updater, "updater" );
00244 xgi::bind(this, &FUEventProcessor::procStat, "procStat" );
00245
00246
00247
00248 edm::AssertHandler ah;
00249
00250 try{
00251 LOG4CPLUS_DEBUG(getApplicationLogger(),
00252 "Trying to create message service presence ");
00253 edm::PresenceFactory *pf = edm::PresenceFactory::get();
00254 if(pf != 0) {
00255 pf->makePresence("MessageServicePresence").release();
00256 }
00257 else {
00258 LOG4CPLUS_ERROR(getApplicationLogger(),
00259 "Unable to create message service presence ");
00260 }
00261 }
00262 catch(...) {
00263 LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception");
00264 }
00265 ML::MLlog4cplus::setAppl(this);
00266
00267 typedef std::set<xdaq::ApplicationDescriptor*> AppDescSet_t;
00268 typedef AppDescSet_t::iterator AppDescIter_t;
00269
00270 AppDescSet_t rcms=
00271 getApplicationContext()->getDefaultZone()->
00272 getApplicationDescriptors("RCMSStateListener");
00273 if(rcms.size()==0)
00274 {
00275 LOG4CPLUS_WARN(getApplicationLogger(),
00276 "MonitorReceiver not found, perhaphs it has not been defined ? Scalers updater wl will bail out!");
00277
00278 }
00279 else
00280 {
00281 AppDescIter_t it = rcms.begin();
00282 evtProcessor_.setRcms(*it);
00283 }
00284 pthread_mutex_init(&start_lock_,0);
00285 pthread_mutex_init(&stop_lock_,0);
00286 pthread_mutex_init(&pickup_lock_,0);
00287
00288 forkInfoObj_=nullptr;
00289 pthread_mutex_init(&forkObjLock_,0);
00290 makeStaticInfo();
00291 startSupervisorLoop();
00292
00293 if(vulture_==0) vulture_ = new Vulture(true);
00294
00296
00297 AppDescSet_t setOfiDie=
00298 getApplicationContext()->getDefaultZone()->
00299 getApplicationDescriptors("evf::iDie");
00300
00301 for (AppDescIter_t it=setOfiDie.begin();it!=setOfiDie.end();++it)
00302 if ((*it)->getInstance()==0)
00303 iDieUrl_ = (*it)->getContextDescriptor()->getURL() + "/" + (*it)->getURN();
00304 }
00305
00306
00307
00308
00309 FUEventProcessor::~FUEventProcessor()
00310 {
00311
00312
00313
00314 if(vulture_ != 0) delete vulture_;
00315 }
00316
00317
00319
00321
00322
00323
00324 bool FUEventProcessor::configuring(toolbox::task::WorkLoop* wl)
00325 {
00326
00327
00328
00329
00330
00331 unsigned short smap
00332 = ((nbSubProcesses_.value_!=0) ? 0x10 : 0)
00333 + (((instance_.value_%80)==0) ? 0x8 : 0)
00334 + (hasServiceWebRegistry_.value_ ? 0x4 : 0)
00335 + (hasModuleWebRegistry_.value_ ? 0x2 : 0)
00336 + (hasPrescaleService_.value_ ? 0x1 : 0);
00337 if(nbSubProcesses_.value_==0)
00338 {
00339 spMStates_.setSize(1);
00340 spmStates_.setSize(1);
00341 }
00342 else
00343 {
00344 spMStates_.setSize(nbSubProcesses_.value_);
00345 spmStates_.setSize(nbSubProcesses_.value_);
00346 for(unsigned int i = 0; i < spMStates_.size(); i++)
00347 {
00348 spMStates_[i] = edm::event_processor::sInit;
00349 spmStates_[i] = 0;
00350 }
00351 }
00352 try {
00353 LOG4CPLUS_INFO(getApplicationLogger(),"Start configuring ...");
00354 std::string cfg = configString_.toString(); evtProcessor_.init(smap,cfg);
00355 epInitialized_=true;
00356 if(evtProcessor_)
00357 {
00358
00359 mwrRef_ = evtProcessor_.getModuleWebRegistry();
00360 sorRef_ = evtProcessor_.getShmOutputModuleRegistry();
00361
00362 configuration_ = evtProcessor_.configuration();
00363 if(nbSubProcesses_.value_==0) evtProcessor_.startMonitoringWorkLoop();
00364 evtProcessor_->beginJob();
00365 if(cpustat_) {delete cpustat_; cpustat_=0;}
00366 cpustat_ = new CPUStat(evtProcessor_.getNumberOfMicrostates(),
00367 nbSubProcesses_.value_,
00368 instance_.value_,
00369 iDieUrl_.value_);
00370 if(ratestat_) {delete ratestat_; ratestat_=0;}
00371 ratestat_ = new RateStat(iDieUrl_.value_);
00372 if(iDieStatisticsGathering_.value_){
00373 try{
00374 cpustat_->sendLegenda(evtProcessor_.getmicromap());
00375 xdata::Serializable *legenda = scalersLegendaInfoSpace_->find("scalersLegenda");
00376 if(legenda !=0){
00377 std::string slegenda = ((xdata::String*)legenda)->value_;
00378 ratestat_->sendLegenda(slegenda);
00379 }
00380
00381 }
00382 catch(evf::Exception &e){
00383 LOG4CPLUS_INFO(getApplicationLogger(),"coud not send legenda"
00384 << e.what());
00385 }
00386 }
00387
00388 fsm_.fireEvent("ConfigureDone",this);
00389 LOG4CPLUS_INFO(getApplicationLogger(),"Finished configuring!");
00390 localLog("-I- Configuration completed");
00391
00392 }
00393 }
00394 catch (xcept::Exception &e) {
00395 reasonForFailedState_ = "configuring FAILED: " + (std::string)e.what();
00396 fsm_.fireFailed(reasonForFailedState_,this);
00397 localLog(reasonForFailedState_);
00398 }
00399 catch(cms::Exception &e) {
00400 reasonForFailedState_ = e.explainSelf();
00401 fsm_.fireFailed(reasonForFailedState_,this);
00402 localLog(reasonForFailedState_);
00403 }
00404 catch(std::exception &e) {
00405 reasonForFailedState_ = e.what();
00406 fsm_.fireFailed(reasonForFailedState_,this);
00407 localLog(reasonForFailedState_);
00408 }
00409 catch(...) {
00410 fsm_.fireFailed("Unknown Exception",this);
00411 }
00412
00413 if(vulture_!=0 && vp_ == 0) vp_ = vulture_->makeProcess();
00414
00415 return false;
00416 }
00417
00418
00419
00420
00421
00422 bool FUEventProcessor::enabling(toolbox::task::WorkLoop* wl)
00423 {
00424 nbTotalDQM_ = 0;
00425 scalersUpdates_ = 0;
00426
00427
00428
00429
00430
00431 unsigned short smap
00432 = ((nbSubProcesses_.value_!=0) ? 0x10 : 0)
00433 + (((instance_.value_%80)==0) ? 0x8 : 0)
00434 + (hasServiceWebRegistry_.value_ ? 0x4 : 0)
00435 + (hasModuleWebRegistry_.value_ ? 0x2 : 0)
00436 + (hasPrescaleService_.value_ ? 0x1 : 0);
00437
00438 LOG4CPLUS_INFO(getApplicationLogger(),"Start enabling...");
00439 if(!epInitialized_){
00440 evtProcessor_.forceInitEventProcessorMaybe();
00441 }
00442 std::string cfg = configString_.toString(); evtProcessor_.init(smap,cfg);
00443
00444 mwrRef_ = evtProcessor_.getModuleWebRegistry();
00445 sorRef_ = evtProcessor_.getShmOutputModuleRegistry();
00446
00447 if(!epInitialized_){
00448 evtProcessor_->beginJob();
00449 if(cpustat_) {delete cpustat_; cpustat_=0;}
00450 cpustat_ = new CPUStat(evtProcessor_.getNumberOfMicrostates(),
00451 nbSubProcesses_.value_,
00452 instance_.value_,
00453 iDieUrl_.value_);
00454 if(ratestat_) {delete ratestat_; ratestat_=0;}
00455 ratestat_ = new RateStat(iDieUrl_.value_);
00456 if(iDieStatisticsGathering_.value_){
00457 try
00458 {
00459 cpustat_->sendLegenda(evtProcessor_.getmicromap());
00460 xdata::Serializable *legenda = scalersLegendaInfoSpace_->find("scalersLegenda");
00461 if(legenda !=0)
00462 {
00463 std::string slegenda = ((xdata::String*)legenda)->value_;
00464 ratestat_->sendLegenda(slegenda);
00465 }
00466 }
00467 catch(evf::Exception &e)
00468 {
00469 LOG4CPLUS_INFO(getApplicationLogger(),"could not send legenda"
00470 << e.what());
00471 }
00472 catch (xcept::Exception& e) {
00473 LOG4CPLUS_ERROR(getApplicationLogger(),"Failed to get or send legenda."
00474 << e.what());
00475 }
00476 }
00477 epInitialized_ = true;
00478 }
00479 configuration_ = evtProcessor_.configuration();
00480 evtProcessor_.resetLumiSectionReferenceIndex();
00481
00482 if(nbSubProcesses_.value_==0) return enableClassic();
00483
00484 pthread_mutex_lock(&start_lock_);
00485 pthread_mutex_lock(&pickup_lock_);
00486 subs_.clear();
00487 subs_.resize(nbSubProcesses_.value_);
00488 pid_t retval = -1;
00489
00490 for(unsigned int i=0; i<nbSubProcesses_.value_; i++)
00491 {
00492 subs_[i]=SubProcess(i,retval);
00493 }
00494 pthread_mutex_unlock(&pickup_lock_);
00495
00496 pthread_mutex_unlock(&start_lock_);
00497
00498
00499 try {
00500 if (sorRef_) {
00501 unsigned int nbExpectedEPWriters = nbSubProcesses_.value_;
00502 if (nbExpectedEPWriters==0) nbExpectedEPWriters=1;
00503 std::vector<edm::FUShmOutputModule *> shmOutputs = sorRef_->getShmOutputModules();
00504 for (unsigned int i=0;i<shmOutputs.size();i++) {
00505 shmOutputs[i]->setNExpectedEPs(nbExpectedEPWriters);
00506 }
00507 }
00508 }
00509 catch (...)
00510 {
00511 LOG4CPLUS_ERROR(getApplicationLogger(),"Thrown Exception while setting nExpectedEPs in shmOutputs");
00512 }
00513
00514
00515 edm_init_done_=true;
00516 if (forkInEDM_.value_) {
00517 edm_init_done_=false;
00518 enableForkInEDM();
00519 }
00520 else
00521 for(unsigned int i=0; i<nbSubProcesses_.value_; i++)
00522 {
00523 retval = subs_[i].forkNew();
00524 if(retval==0)
00525 {
00526 myProcess_ = &subs_[i];
00527
00528 delete toolbox::mem::_s_mutex_ptr_;
00529 toolbox::mem::_s_mutex_ptr_ = new toolbox::BSem(toolbox::BSem::FULL,true);
00530 int retval = pthread_mutex_destroy(&stop_lock_);
00531 if(retval != 0) perror("error");
00532 retval = pthread_mutex_init(&stop_lock_,0);
00533 if(retval != 0) perror("error");
00534 fsm_.disableRcmsStateNotification();
00535
00536 return enableMPEPSlave();
00537
00538 }
00539 }
00540
00541 if (!edm_init_done_) {
00542
00543 LOG4CPLUS_INFO(getApplicationLogger(),"Finished enabling!");
00544 fsm_.fireEvent("EnableDone",this);
00545 localLog("-I- Start completed");
00546
00547 edm::event_processor::State st;
00548 while (!edm_init_done_) {
00549 usleep(10000);
00550 st = evtProcessor_->getState();
00551 if (st==edm::event_processor::sError || st==edm::event_processor::sInvalid) break;
00552 }
00553 st = evtProcessor_->getState();
00554
00555 if (st!=edm::event_processor::sRunning) {
00556 reasonForFailedState_ = std::string("Master edm::EventProcessor in state ") + evtProcessor_->stateName(st);
00557 localLog(reasonForFailedState_);
00558 fsm_.fireFailed(reasonForFailedState_,this);
00559 return false;
00560 }
00561 startSummarizeWorkLoop();
00562 vp_ = vulture_->start(iDieUrl_.value_,runNumber_.value_);
00563 return false;
00564 }
00565
00566 startSummarizeWorkLoop();
00567 vp_ = vulture_->start(iDieUrl_.value_,runNumber_.value_);
00568 LOG4CPLUS_INFO(getApplicationLogger(),"Finished enabling!");
00569 fsm_.fireEvent("EnableDone",this);
00570 localLog("-I- Start completed");
00571 return false;
00572 }
00573
00574 bool FUEventProcessor::doEndRunInEDM() {
00575
00576 if (forkInfoObj_) {
00577
00578 int count = 30;
00579 while (!edm_init_done_ && count) {
00580 ::sleep(1);
00581 if (count%5==0)
00582 LOG4CPLUS_WARN(log_,"MASTER EP: Stopping while EP busy in beginRun. waiting " <<count<< "sec");
00583 count--;
00584 }
00585
00586
00587 if (evtProcessor_->getState()==edm::event_processor::sJobReady)
00588 return true;
00589
00590 forkInfoObj_->lock();
00591 forkInfoObj_->stopCondition=true;
00592 sem_post(forkInfoObj_->control_sem_);
00593 forkInfoObj_->unlock();
00594
00595 count = 30;
00596
00597 edm::event_processor::State st;
00598 while(count--) {
00599 st = evtProcessor_->getState();
00600 if (st==edm::event_processor::sRunning) ::usleep(100000);
00601 else if (st==edm::event_processor::sStopping || st==edm::event_processor::sJobReady) {
00602 break;
00603 }
00604 else {
00605 LOG4CPLUS_ERROR(getApplicationLogger(),
00606 "Master edm::EventProcessor is in state "<< evtProcessor_->stateName(st) << " while stopping");
00607 return false;
00608 }
00609 }
00610 if (count<0) {
00611 LOG4CPLUS_ERROR(getApplicationLogger(),
00612 "Timeout waiting for Master edm::EventProcessor to go stopping state:"<<evtProcessor_->stateName(st));
00613 return false;
00614 }
00615 }
00616 return true;
00617 }
00618
00619
00620 bool FUEventProcessor::stopping(toolbox::task::WorkLoop* wl)
00621 {
00622 if(nbSubProcesses_.value_!=0) {
00623 stopSlavesAndAcknowledge();
00624 if (forkInEDM_.value_) doEndRunInEDM();
00625 }
00626 vulture_->stop();
00627
00628 if (forkInEDM_.value_) {
00629 bool tmpHasShMem_=hasShMem_;
00630 hasShMem_=false;
00631 bool stop_status = stopClassic();
00632 hasShMem_=tmpHasShMem_;
00633 return stop_status;
00634 }
00635 return stopClassic();
00636 }
00637
00638
00639
00640 bool FUEventProcessor::halting(toolbox::task::WorkLoop* wl)
00641 {
00642 LOG4CPLUS_INFO(getApplicationLogger(),"Start halting ...");
00643 if(nbSubProcesses_.value_!=0) {
00644 stopSlavesAndAcknowledge();
00645 if (forkInEDM_.value_) doEndRunInEDM();
00646 }
00647 try{
00648 evtProcessor_.stopAndHalt();
00649
00650 if (forkInfoObj_) {
00651 delete forkInfoObj_;
00652 forkInfoObj_=0;
00653 }
00654 }
00655 catch (evf::Exception &e) {
00656 reasonForFailedState_ = "halting FAILED: " + (std::string)e.what();
00657 localLog(reasonForFailedState_);
00658 fsm_.fireFailed(reasonForFailedState_,this);
00659 }
00660
00661
00662 LOG4CPLUS_INFO(getApplicationLogger(),"Finished halting!");
00663 fsm_.fireEvent("HaltDone",this);
00664
00665 localLog("-I- Halt completed");
00666 return false;
00667 }
00668
00669
00670
00671 xoap::MessageReference FUEventProcessor::fsmCallback(xoap::MessageReference msg)
00672 throw (xoap::exception::Exception)
00673 {
00674 return fsm_.commandCallback(msg);
00675 }
00676
00677
00678
00679 void FUEventProcessor::actionPerformed(xdata::Event& e)
00680 {
00681
00682 if (e.type()=="ItemChangedEvent" && fsm_.stateName()->toString()!="Halted") {
00683 std::string item = dynamic_cast<xdata::ItemChangedEvent&>(e).itemName();
00684
00685 if ( item == "parameterSet") {
00686 LOG4CPLUS_WARN(getApplicationLogger(),
00687 "HLT Menu changed, force reinitialization of EventProcessor");
00688 epInitialized_ = false;
00689 }
00690 if ( item == "outputEnabled") {
00691 if(outprev_ != outPut_) {
00692 LOG4CPLUS_WARN(getApplicationLogger(),
00693 (outprev_ ? "Disabling " : "Enabling ")<<"global output");
00694 evtProcessor_->enableEndPaths(outPut_);
00695 outprev_ = outPut_;
00696 }
00697 }
00698 if (item == "globalInputPrescale") {
00699 LOG4CPLUS_WARN(getApplicationLogger(),
00700 "Setting global input prescale has no effect "
00701 <<"in this version of the code");
00702 }
00703 if ( item == "globalOutputPrescale") {
00704 LOG4CPLUS_WARN(getApplicationLogger(),
00705 "Setting global output prescale has no effect "
00706 <<"in this version of the code");
00707 }
00708 }
00709
00710 }
00711
00712
00713 void FUEventProcessor::subWeb(xgi::Input *in, xgi::Output *out)
00714 {
00715 using namespace cgicc;
00716 pid_t pid = 0;
00717 std::ostringstream ost;
00718 ost << "&";
00719
00720 Cgicc cgi(in);
00721 internal::MyCgi *mycgi = (internal::MyCgi*)in;
00722 for(std::map<std::string, std::string, std::less<std::string> >::iterator mit =
00723 mycgi->getEnvironment().begin();
00724 mit != mycgi->getEnvironment().end(); mit++)
00725 ost << mit->first << "%" << mit->second << ";";
00726 std::vector<FormEntry> els = cgi.getElements() ;
00727 std::vector<FormEntry> el1;
00728 cgi.getElement("method",el1);
00729 std::vector<FormEntry> el2;
00730 cgi.getElement("process",el2);
00731 if(el1.size()!=0) {
00732 std::string meth = el1[0].getValue();
00733 if(el2.size()!=0) {
00734 unsigned int i = 0;
00735 std::string mod = el2[0].getValue();
00736 pid = atoi(mod.c_str());
00737 for(; i < subs_.size(); i++)
00738 if(subs_[i].pid()==pid) break;
00739 if(i>=subs_.size()){
00740 *out << "ERROR 404 : Process " << pid << " Not Found !" << std::endl;
00741 return;
00742 }
00743 if(subs_[i].alive() != 1){
00744 *out << "ERROR 405 : Process " << pid << " Not Alive !" << std::endl;
00745 return;
00746 }
00747 MsgBuf msg1(meth.length()+ost.str().length()+1,MSQM_MESSAGE_TYPE_WEB);
00748 strncpy(msg1->mtext,meth.c_str(),meth.length());
00749 strncpy(msg1->mtext+meth.length(),ost.str().c_str(),ost.str().length());
00750 subs_[i].post(msg1,true);
00751 unsigned int keep_supersleep_original_value = superSleepSec_.value_;
00752 superSleepSec_.value_=10*keep_supersleep_original_value;
00753 MsgBuf msg(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_WEB);
00754 bool done = false;
00755 std::vector<char *>pieces;
00756 while(!done){
00757 unsigned long retval1 = subs_[i].rcvNonBlocking(msg,true);
00758 if(retval1 == MSGQ_MESSAGE_TYPE_RANGE){
00759 ::sleep(1);
00760 continue;
00761 }
00762 unsigned int nbytes = atoi(msg->mtext);
00763 if(nbytes < MAX_PIPE_BUFFER_SIZE) done = true;
00764 char *buf= new char[nbytes];
00765 ssize_t retval = read(anonymousPipe_[PIPE_READ],buf,nbytes);
00766 if(retval!=nbytes) std::cout
00767 << "CAREFUL HERE, read less bytes than expected from pipe in subWeb" << std::endl;
00768 pieces.push_back(buf);
00769 }
00770 superSleepSec_.value_=keep_supersleep_original_value;
00771 for(unsigned int j = 0; j < pieces.size(); j++){
00772 *out<<pieces[j];
00773 delete[] pieces[j];
00774 }
00775 }
00776 }
00777 }
00778
00779
00780
00781 void FUEventProcessor::defaultWebPage(xgi::Input *in, xgi::Output *out)
00782 throw (xgi::exception::Exception)
00783 {
00784
00785
00786 *out << "<!DOCTYPE HTML PUBLIC \"-//W3C//DTD HTML 4.0 Transitional//EN\">"
00787 << "<html><head><title>" << getApplicationDescriptor()->getClassName() << (nbSubProcesses_.value_ > 0 ? "MP " : " ")
00788 << getApplicationDescriptor()->getInstance() << "</title>"
00789 << "<meta https-equiv=\"REFRESH\" content=\"0;url=/evf/html/defaultBasePage.html\">"
00790 << "</head></html>";
00791 }
00792
00793
00794
00795
00796
00797 void FUEventProcessor::spotlightWebPage(xgi::Input *in, xgi::Output *out)
00798 throw (xgi::exception::Exception)
00799 {
00800
00801 std::string urn = getApplicationDescriptor()->getURN();
00802
00803 *out << "<!-- base href=\"/" << urn
00804 << "\"> -->" << std::endl;
00805 *out << "<html>" << std::endl;
00806 *out << "<head>" << std::endl;
00807 *out << "<link type=\"text/css\" rel=\"stylesheet\"";
00808 *out << " href=\"/evf/html/styles.css\"/>" << std::endl;
00809 *out << "<title>" << getApplicationDescriptor()->getClassName()
00810 << getApplicationDescriptor()->getInstance()
00811 << " MAIN</title>" << std::endl;
00812 *out << "</head>" << std::endl;
00813 *out << "<body>" << std::endl;
00814 *out << "<table border=\"0\" width=\"100%\">" << std::endl;
00815 *out << "<tr>" << std::endl;
00816 *out << " <td align=\"left\">" << std::endl;
00817 *out << " <img" << std::endl;
00818 *out << " align=\"middle\"" << std::endl;
00819 *out << " src=\"/evf/images/spoticon.jpg\"" << std::endl;
00820 *out << " alt=\"main\"" << std::endl;
00821 *out << " width=\"64\"" << std::endl;
00822 *out << " height=\"64\"" << std::endl;
00823 *out << " border=\"\"/>" << std::endl;
00824 *out << " <b>" << std::endl;
00825 *out << getApplicationDescriptor()->getClassName()
00826 << getApplicationDescriptor()->getInstance() << std::endl;
00827 *out << " " << fsm_.stateName()->toString() << std::endl;
00828 *out << " </b>" << std::endl;
00829 *out << " </td>" << std::endl;
00830 *out << " <td width=\"32\">" << std::endl;
00831 *out << " <a href=\"/urn:xdaq-application:lid=3\">" << std::endl;
00832 *out << " <img" << std::endl;
00833 *out << " align=\"middle\"" << std::endl;
00834 *out << " src=\"/hyperdaq/images/HyperDAQ.jpg\"" << std::endl;
00835 *out << " alt=\"HyperDAQ\"" << std::endl;
00836 *out << " width=\"32\"" << std::endl;
00837 *out << " height=\"32\"" << std::endl;
00838 *out << " border=\"\"/>" << std::endl;
00839 *out << " </a>" << std::endl;
00840 *out << " </td>" << std::endl;
00841 *out << " <td width=\"32\">" << std::endl;
00842 *out << " </td>" << std::endl;
00843 *out << " <td width=\"32\">" << std::endl;
00844 *out << " <a href=\"/" << urn << "/\">" << std::endl;
00845 *out << " <img" << std::endl;
00846 *out << " align=\"middle\"" << std::endl;
00847 *out << " src=\"/evf/images/epicon.jpg\"" << std::endl;
00848 *out << " alt=\"main\"" << std::endl;
00849 *out << " width=\"32\"" << std::endl;
00850 *out << " height=\"32\"" << std::endl;
00851 *out << " border=\"\"/>" << std::endl;
00852 *out << " </a>" << std::endl;
00853 *out << " </td>" << std::endl;
00854 *out << "</tr>" << std::endl;
00855 *out << "</table>" << std::endl;
00856
00857 *out << "<hr/>" << std::endl;
00858
00859 std::ostringstream ost;
00860 if(myProcess_)
00861 ost << "/SubWeb?process=" << getpid() << "&method=moduleWeb&";
00862 else
00863 ost << "/moduleWeb?";
00864 urn += ost.str();
00865 if(evtProcessor_ && (myProcess_ || nbSubProcesses_.value_==0))
00866 evtProcessor_.taskWebPage(in,out,urn);
00867 else if(evtProcessor_)
00868 evtProcessor_.summaryWebPage(in,out,urn);
00869 else
00870 *out << "<td>HLT Unconfigured</td>" << std::endl;
00871 *out << "</table>" << std::endl;
00872
00873 *out << "<br><textarea rows=" << 10 << " cols=80 scroll=yes>" << std::endl;
00874 *out << configuration_ << std::endl;
00875 *out << "</textarea><P>" << std::endl;
00876
00877 *out << "</body>" << std::endl;
00878 *out << "</html>" << std::endl;
00879
00880
00881 }
00882 void FUEventProcessor::scalersWeb(xgi::Input *in, xgi::Output *out)
00883 throw (xgi::exception::Exception)
00884 {
00885
00886 out->getHTTPResponseHeader().addHeader( "Content-Type",
00887 "application/octet-stream" );
00888 out->getHTTPResponseHeader().addHeader( "Content-Transfer-Encoding",
00889 "binary" );
00890 if(evtProcessor_ != 0){
00891 out->write( (char*)(evtProcessor_.getPackedTriggerReportAsStruct()), sizeof(TriggerReportStatic) );
00892 }
00893 }
00894
00895 void FUEventProcessor::pathNames(xgi::Input *in, xgi::Output *out)
00896 throw (xgi::exception::Exception)
00897 {
00898
00899 if(evtProcessor_ != 0){
00900 xdata::Serializable *legenda = scalersLegendaInfoSpace_->find("scalersLegenda");
00901 if(legenda !=0){
00902 std::string slegenda = ((xdata::String*)legenda)->value_;
00903 *out << slegenda << std::endl;
00904 }
00905 }
00906 }
00907
00908
00909 void FUEventProcessor::setAttachDqmToShm() throw (evf::Exception)
00910 {
00911 std::string errmsg;
00912 try {
00913 edm::ServiceRegistry::Operate operate(evtProcessor_->getToken());
00914 if(edm::Service<FUShmDQMOutputService>().isAvailable())
00915 edm::Service<FUShmDQMOutputService>()->setAttachToShm();
00916 }
00917 catch (cms::Exception& e) {
00918 errmsg = "Failed to set to attach DQM service to shared memory: " + (std::string)e.what();
00919 }
00920 if (!errmsg.empty()) XCEPT_RAISE(evf::Exception,errmsg);
00921 }
00922
00923
00924 void FUEventProcessor::attachDqmToShm() throw (evf::Exception)
00925 {
00926 std::string errmsg;
00927 bool success = false;
00928 try {
00929 edm::ServiceRegistry::Operate operate(evtProcessor_->getToken());
00930 if(edm::Service<FUShmDQMOutputService>().isAvailable())
00931 success = edm::Service<FUShmDQMOutputService>()->attachToShm();
00932 if (!success) errmsg = "Failed to attach DQM service to shared memory";
00933 }
00934 catch (cms::Exception& e) {
00935 errmsg = "Failed to attach DQM service to shared memory: " + (std::string)e.what();
00936 }
00937 if (!errmsg.empty()) XCEPT_RAISE(evf::Exception,errmsg);
00938 }
00939
00940
00941
00942 void FUEventProcessor::detachDqmFromShm() throw (evf::Exception)
00943 {
00944 std::string errmsg;
00945 bool success = false;
00946 try {
00947 edm::ServiceRegistry::Operate operate(evtProcessor_->getToken());
00948 if(edm::Service<FUShmDQMOutputService>().isAvailable())
00949 success = edm::Service<FUShmDQMOutputService>()->detachFromShm();
00950 if (!success) errmsg = "Failed to detach DQM service from shared memory";
00951 }
00952 catch (cms::Exception& e) {
00953 errmsg = "Failed to detach DQM service from shared memory: " + (std::string)e.what();
00954 }
00955 if (!errmsg.empty()) XCEPT_RAISE(evf::Exception,errmsg);
00956 }
00957
00958
00959 std::string FUEventProcessor::logsAsString()
00960 {
00961 std::ostringstream oss;
00962 if(logWrap_)
00963 {
00964 for(unsigned int i = logRingIndex_; i < logRing_.size(); i++)
00965 oss << logRing_[i] << std::endl;
00966 for(unsigned int i = 0; i < logRingIndex_; i++)
00967 oss << logRing_[i] << std::endl;
00968 }
00969 else
00970 for(unsigned int i = logRingIndex_; i < logRing_.size(); i++)
00971 oss << logRing_[i] << std::endl;
00972
00973 return oss.str();
00974 }
00975
00976 void FUEventProcessor::localLog(std::string m)
00977 {
00978 timeval tv;
00979
00980 gettimeofday(&tv,0);
00981 tm *uptm = localtime(&tv.tv_sec);
00982 char datestring[256];
00983 strftime(datestring, sizeof(datestring),"%c", uptm);
00984
00985 if(logRingIndex_ == 0){logWrap_ = true; logRingIndex_ = logRingSize_;}
00986 logRingIndex_--;
00987 std::ostringstream timestamp;
00988 timestamp << " at " << datestring;
00989 m += timestamp.str();
00990 logRing_[logRingIndex_] = m;
00991 }
00992
00993 void FUEventProcessor::startSupervisorLoop()
00994 {
00995 try {
00996 wlSupervising_=
00997 toolbox::task::getWorkLoopFactory()->getWorkLoop("Supervisor",
00998 "waiting");
00999 if (!wlSupervising_->isActive()) wlSupervising_->activate();
01000 asSupervisor_ = toolbox::task::bind(this,&FUEventProcessor::supervisor,
01001 "Supervisor");
01002 wlSupervising_->submit(asSupervisor_);
01003 supervising_ = true;
01004 }
01005 catch (xcept::Exception& e) {
01006 std::string msg = "Failed to start workloop 'Supervisor'.";
01007 XCEPT_RETHROW(evf::Exception,msg,e);
01008 }
01009 }
01010
01011 void FUEventProcessor::startReceivingLoop()
01012 {
01013 try {
01014 wlReceiving_=
01015 toolbox::task::getWorkLoopFactory()->getWorkLoop("Receiving",
01016 "waiting");
01017 if (!wlReceiving_->isActive()) wlReceiving_->activate();
01018 asReceiveMsgAndExecute_ = toolbox::task::bind(this,&FUEventProcessor::receiving,
01019 "Receiving");
01020 wlReceiving_->submit(asReceiveMsgAndExecute_);
01021 receiving_ = true;
01022 }
01023 catch (xcept::Exception& e) {
01024 std::string msg = "Failed to start workloop 'Receiving'.";
01025 XCEPT_RETHROW(evf::Exception,msg,e);
01026 }
01027 }
01028 void FUEventProcessor::startReceivingMonitorLoop()
01029 {
01030 try {
01031 wlReceivingMonitor_=
01032 toolbox::task::getWorkLoopFactory()->getWorkLoop("ReceivingM",
01033 "waiting");
01034 if (!wlReceivingMonitor_->isActive())
01035 wlReceivingMonitor_->activate();
01036 asReceiveMsgAndRead_ =
01037 toolbox::task::bind(this,&FUEventProcessor::receivingAndMonitor,
01038 "ReceivingM");
01039 wlReceivingMonitor_->submit(asReceiveMsgAndRead_);
01040 receivingM_ = true;
01041 }
01042 catch (xcept::Exception& e) {
01043 std::string msg = "Failed to start workloop 'ReceivingM'.";
01044 XCEPT_RETHROW(evf::Exception,msg,e);
01045 }
01046 }
01047
01048 bool FUEventProcessor::receiving(toolbox::task::WorkLoop *)
01049 {
01050 MsgBuf msg;
01051 try{
01052 myProcess_->rcvSlave(msg,false);
01053 if(msg->mtype==MSQM_MESSAGE_TYPE_STOP)
01054 {
01055 pthread_mutex_lock(&stop_lock_);
01056 fsm_.fireEvent("Stop",this);
01057 try{
01058 LOG4CPLUS_DEBUG(getApplicationLogger(),
01059 "Trying to create message service presence ");
01060 edm::PresenceFactory *pf = edm::PresenceFactory::get();
01061 if(pf != 0) {
01062 pf->makePresence("MessageServicePresence").release();
01063 }
01064 else {
01065 LOG4CPLUS_ERROR(getApplicationLogger(),
01066 "Unable to create message service presence ");
01067 }
01068 }
01069 catch(...) {
01070 LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception");
01071 }
01072 stopClassic();
01073 MsgBuf msg1(0,MSQS_MESSAGE_TYPE_STOP);
01074 myProcess_->postSlave(msg1,false);
01075 pthread_mutex_unlock(&stop_lock_);
01076 fclose(stdout);
01077 fclose(stderr);
01078 _exit(EXIT_SUCCESS);
01079 }
01080 if(msg->mtype==MSQM_MESSAGE_TYPE_FSTOP)
01081 _exit(EXIT_SUCCESS);
01082 }
01083 catch(evf::Exception &e){}
01084 return true;
01085 }
01086
01087 bool FUEventProcessor::supervisor(toolbox::task::WorkLoop *)
01088 {
01089 pthread_mutex_lock(&stop_lock_);
01090 if(subs_.size()!=nbSubProcesses_.value_)
01091 {
01092 pthread_mutex_lock(&pickup_lock_);
01093 if(subs_.size()!=nbSubProcesses_.value_) {
01094 subs_.resize(nbSubProcesses_.value_);
01095 spMStates_.resize(nbSubProcesses_.value_);
01096 spmStates_.resize(nbSubProcesses_.value_);
01097 for(unsigned int i = 0; i < spMStates_.size(); i++)
01098 {
01099 spMStates_[i] = edm::event_processor::sInit;
01100 spmStates_[i] = 0;
01101 }
01102 }
01103 pthread_mutex_unlock(&pickup_lock_);
01104 }
01105 bool running = fsm_.stateName()->toString()=="Enabled";
01106 bool stopping = fsm_.stateName()->toString()=="stopping";
01107 for(unsigned int i = 0; i < subs_.size(); i++)
01108 {
01109 if(subs_[i].alive()==-1000) continue;
01110 int sl;
01111 pid_t sub_pid = subs_[i].pid();
01112 pid_t killedOrNot = waitpid(sub_pid,&sl,WNOHANG);
01113
01114 if(killedOrNot && killedOrNot==sub_pid) {
01115 pthread_mutex_lock(&pickup_lock_);
01116
01117 if (i<subs_.size() && subs_[i].alive()!=-1000) {
01118 subs_[i].setStatus((WIFEXITED(sl) != 0 ? 0 : -1));
01119 std::ostringstream ost;
01120 if(subs_[i].alive()==0) ost << " process exited with status " << WEXITSTATUS(sl);
01121 else if(WIFSIGNALED(sl)!=0) ost << " process terminated with signal " << WTERMSIG(sl);
01122 else ost << " process stopped ";
01123 subs_[i].countdown()=slaveRestartDelaySecs_.value_;
01124 subs_[i].setReasonForFailed(ost.str());
01125 spMStates_[i] = evtProcessor_.notstarted_state_code();
01126 spmStates_[i] = 0;
01127 std::ostringstream ost1;
01128 ost1 << "-E- Slave " << subs_[i].pid() << ost.str();
01129 localLog(ost1.str());
01130 if(!autoRestartSlaves_.value_) subs_[i].disconnect();
01131 }
01132 pthread_mutex_unlock(&pickup_lock_);
01133 }
01134 }
01135 pthread_mutex_unlock(&stop_lock_);
01136 if(stopping) return true;
01137
01138 if(running && edm_init_done_)
01139 {
01140
01141
01142 if(autoRestartSlaves_.value_){
01143 pthread_mutex_lock(&stop_lock_);
01144 for(unsigned int i = 0; i < subs_.size(); i++)
01145 {
01146 if(subs_[i].alive() != 1){
01147 if(subs_[i].countdown() == 0)
01148 {
01149 if(subs_[i].restartCount()>2){
01150 LOG4CPLUS_WARN(getApplicationLogger()," Not restarting subprocess in slot " << i
01151 << " - maximum restart count reached");
01152 std::ostringstream ost1;
01153 ost1 << "-W- Dead Process in slot " << i << " reached maximum restart count";
01154 localLog(ost1.str());
01155 subs_[i].countdown()--;
01156 XCEPT_DECLARE(evf::Exception,
01157 sentinelException, ost1.str());
01158 notifyQualified("error",sentinelException);
01159 subs_[i].disconnect();
01160 continue;
01161 }
01162 subs_[i].restartCount()++;
01163 if (forkInEDM_.value_) {
01164 restartForkInEDM(i);
01165 }
01166 else {
01167 pid_t rr = subs_[i].forkNew();
01168 if(rr==0)
01169 {
01170 myProcess_=&subs_[i];
01171 scalersUpdates_ = 0;
01172 int retval = pthread_mutex_destroy(&stop_lock_);
01173 if(retval != 0) perror("error");
01174 retval = pthread_mutex_init(&stop_lock_,0);
01175 if(retval != 0) perror("error");
01176 fsm_.disableRcmsStateNotification();
01177 fsm_.fireEvent("Stop",this);
01178 fsm_.fireEvent("StopDone",this);
01179 fsm_.fireEvent("Enable",this);
01180 try{
01181 xdata::Serializable *lsid = applicationInfoSpace_->find("lumiSectionIndex");
01182 if(lsid) {
01183 ((xdata::UnsignedInteger32*)(lsid))->value_--;
01184 }
01185 }
01186 catch(...){
01187 std::cout << "trouble with lsindex during restart" << std::endl;
01188 }
01189 try{
01190 xdata::Serializable *lstb = applicationInfoSpace_->find("lsToBeRecovered");
01191 if(lstb) {
01192 ((xdata::Boolean*)(lstb))->value_ = false;
01193 }
01194 }
01195 catch(...){
01196 std::cout << "trouble with resetting flag for eol recovery " << std::endl;
01197 }
01198
01199 evtProcessor_.adjustLsIndexForRestart();
01200 evtProcessor_.resetPackedTriggerReport();
01201 enableMPEPSlave();
01202 return false;
01203 }
01204 else
01205 {
01206 std::ostringstream ost1;
01207 ost1 << "-I- New Process " << rr << " forked for slot " << i;
01208 localLog(ost1.str());
01209 }
01210 }
01211 }
01212 if(subs_[i].countdown()>=0) subs_[i].countdown()--;
01213 }
01214 }
01215 pthread_mutex_unlock(&stop_lock_);
01216 }
01217 }
01218 xdata::Serializable *lsid = 0;
01219 xdata::Serializable *psid = 0;
01220 xdata::Serializable *dqmp = 0;
01221 xdata::UnsignedInteger32 *dqm = 0;
01222
01223
01224
01225 if(running && edm_init_done_){
01226 try{
01227 lsid = applicationInfoSpace_->find("lumiSectionIndex");
01228 psid = applicationInfoSpace_->find("prescaleSetIndex");
01229 nbProcessed = monitorInfoSpace_->find("nbProcessed");
01230 nbAccepted = monitorInfoSpace_->find("nbAccepted");
01231 dqmp = applicationInfoSpace_-> find("nbDqmUpdates");
01232 }
01233 catch(xdata::exception::Exception e){
01234 LOG4CPLUS_INFO(getApplicationLogger(),"could not retrieve some data - " << e.what());
01235 }
01236
01237 try{
01238 if(nbProcessed !=0 && nbAccepted !=0)
01239 {
01240 xdata::UnsignedInteger32*nbp = ((xdata::UnsignedInteger32*)nbProcessed);
01241 xdata::UnsignedInteger32*nba = ((xdata::UnsignedInteger32*)nbAccepted);
01242 xdata::UnsignedInteger32*ls = ((xdata::UnsignedInteger32*)lsid);
01243 xdata::UnsignedInteger32*ps = ((xdata::UnsignedInteger32*)psid);
01244 if(dqmp!=0)
01245 dqm = (xdata::UnsignedInteger32*)dqmp;
01246 if(dqm) dqm->value_ = 0;
01247 nbTotalDQM_ = 0;
01248 nbp->value_ = 0;
01249 nba->value_ = 0;
01250 nblive_ = 0;
01251 nbdead_ = 0;
01252 scalersUpdates_ = 0;
01253
01254 for(unsigned int i = 0; i < subs_.size(); i++)
01255 {
01256 if(subs_[i].alive()>0)
01257 {
01258 nblive_++;
01259 try{
01260 subs_[i].post(master_message_prg_,true);
01261
01262 unsigned long retval = subs_[i].rcvNonBlocking(master_message_prr_,true);
01263 if(retval == (unsigned long) master_message_prr_->mtype){
01264 prg* p = (struct prg*)(master_message_prr_->mtext);
01265 subs_[i].setParams(p);
01266 spMStates_[i] = p->Ms;
01267 spmStates_[i] = p->ms;
01268 cpustat_->addEntry(p->ms);
01269 if(!subs_[i].inInconsistentState() &&
01270 (p->Ms == edm::event_processor::sError
01271 || p->Ms == edm::event_processor::sInvalid
01272 || p->Ms == edm::event_processor::sStopping))
01273 {
01274 std::ostringstream ost;
01275 ost << "edm::eventprocessor slot " << i << " process id "
01276 << subs_[i].pid() << " not in Running state : Mstate="
01277 << evtProcessor_.stateNameFromIndex(p->Ms) << " mstate="
01278 << evtProcessor_.moduleNameFromIndex(p->ms)
01279 << " - Look into possible error messages from HLT process";
01280 LOG4CPLUS_WARN(getApplicationLogger(),ost.str());
01281 }
01282 nbp->value_ += subs_[i].params().nbp;
01283 nba->value_ += subs_[i].params().nba;
01284 if(dqm)dqm->value_ += p->dqm;
01285 nbTotalDQM_ += p->dqm;
01286 scalersUpdates_ += p->trp;
01287 if(p->ls > ls->value_) ls->value_ = p->ls;
01288 if(p->ps != ps->value_) ps->value_ = p->ps;
01289 }
01290 else{
01291 nbp->value_ += subs_[i].get_save_nbp();
01292 nba->value_ += subs_[i].get_save_nba();
01293 }
01294 }
01295 catch(evf::Exception &e){
01296 LOG4CPLUS_INFO(getApplicationLogger(),
01297 "could not send/receive msg on slot "
01298 << i << " - " << e.what());
01299 }
01300
01301 }
01302 else
01303 {
01304 nbp->value_ += subs_[i].get_save_nbp();
01305 nba->value_ += subs_[i].get_save_nba();
01306 nbdead_++;
01307 }
01308 }
01309 if(nbp->value_>64){
01310 for(unsigned int i = 0; i < subs_.size(); i++)
01311 {
01312 if(subs_[i].params().nbp == 0){
01313
01314 if(subs_[i].alive()>0 && subs_[i].params().ms == 0)
01315 {
01316 subs_[i].found_invalid();
01317 if(subs_[i].nfound_invalid() > 60){
01318 MsgBuf msg3(NUMERIC_MESSAGE_SIZE,MSQM_MESSAGE_TYPE_FSTOP);
01319 subs_[i].post(msg3,false);
01320 std::ostringstream ost1;
01321 ost1 << "-W- Process in slot " << i << " Never reached the running state - forcestopping it";
01322 localLog(ost1.str());
01323 LOG4CPLUS_ERROR(getApplicationLogger(),ost1.str());
01324 XCEPT_DECLARE(evf::Exception,
01325 sentinelException, ost1.str());
01326 notifyQualified("error",sentinelException);
01327
01328 }
01329 }
01330 }
01331 }
01332 }
01333 }
01334 }
01335 catch(std::exception &e){
01336 LOG4CPLUS_INFO(getApplicationLogger(),"std exception - " << e.what());
01337 }
01338 catch(...){
01339 LOG4CPLUS_INFO(getApplicationLogger(),"unknown exception ");
01340 }
01341 }
01342 else{
01343 for(unsigned int i = 0; i < subs_.size(); i++)
01344 {
01345 if(subs_[i].alive()==-1000)
01346 {
01347 spMStates_[i] = edm::event_processor::sInit;
01348 spmStates_[i] = 0;
01349 }
01350 }
01351 }
01352 try{
01353 monitorInfoSpace_->lock();
01354 monitorInfoSpace_->fireItemGroupChanged(names_,0);
01355 monitorInfoSpace_->unlock();
01356 }
01357 catch(xdata::exception::Exception &e)
01358 {
01359 LOG4CPLUS_ERROR(log_, "Exception from fireItemGroupChanged: " << e.what());
01360
01361 }
01362 ::sleep(superSleepSec_.value_);
01363 return true;
01364 }
01365
01366 void FUEventProcessor::startScalersWorkLoop() throw (evf::Exception)
01367 {
01368 try {
01369 wlScalers_=
01370 toolbox::task::getWorkLoopFactory()->getWorkLoop("Scalers",
01371 "waiting");
01372 if (!wlScalers_->isActive()) wlScalers_->activate();
01373 asScalers_ = toolbox::task::bind(this,&FUEventProcessor::scalers,
01374 "Scalers");
01375
01376 wlScalers_->submit(asScalers_);
01377 wlScalersActive_ = true;
01378 }
01379 catch (xcept::Exception& e) {
01380 std::string msg = "Failed to start workloop 'Scalers'.";
01381 XCEPT_RETHROW(evf::Exception,msg,e);
01382 }
01383 }
01384
01385
01386
01387 void FUEventProcessor::startSummarizeWorkLoop() throw (evf::Exception)
01388 {
01389 try {
01390 wlSummarize_=
01391 toolbox::task::getWorkLoopFactory()->getWorkLoop("Summary",
01392 "waiting");
01393 if (!wlSummarize_->isActive()) wlSummarize_->activate();
01394
01395 asSummarize_ = toolbox::task::bind(this,&FUEventProcessor::summarize,
01396 "Summary");
01397
01398 wlSummarize_->submit(asSummarize_);
01399 wlSummarizeActive_ = true;
01400 }
01401 catch (xcept::Exception& e) {
01402 std::string msg = "Failed to start workloop 'Summarize'.";
01403 XCEPT_RETHROW(evf::Exception,msg,e);
01404 }
01405 }
01406
01407
01408
01409 bool FUEventProcessor::scalers(toolbox::task::WorkLoop* wl)
01410 {
01411 if(evtProcessor_)
01412 {
01413 if(!evtProcessor_.getTriggerReport(true)) {
01414 wlScalersActive_ = false;
01415 return false;
01416 }
01417 }
01418 else
01419 {
01420 std::cout << getpid()<< " Scalers workloop, bailing out, no evtProcessor " << std::endl;
01421 wlScalersActive_ = false;
01422 return false;
01423 }
01424 if(myProcess_)
01425 {
01426
01427 int ret = myProcess_->postSlave(evtProcessor_.getPackedTriggerReport(),false);
01428 if(ret!=0) std::cout << "scalers workloop, error posting to sqs_ " << errno << std::endl;
01429 scalersUpdates_++;
01430 }
01431 else
01432 evtProcessor_.fireScalersUpdate();
01433 return true;
01434 }
01435
01436
01437 bool FUEventProcessor::summarize(toolbox::task::WorkLoop* wl)
01438 {
01439 evtProcessor_.resetPackedTriggerReport();
01440 bool atLeastOneProcessUpdatedSuccessfully = false;
01441 int msgCount = 0;
01442 for (unsigned int i = 0; i < subs_.size(); i++)
01443 {
01444 if(subs_[i].alive()>0)
01445 {
01446 int ret = 0;
01447 if(subs_[i].check_postponed_trigger_update(master_message_trr_,
01448 evtProcessor_.getLumiSectionReferenceIndex()))
01449 {
01450 ret = MSQS_MESSAGE_TYPE_TRR;
01451 std::cout << "using postponed report from slot " << i << " for ls " << evtProcessor_.getLumiSectionReferenceIndex() << std::endl;
01452 }
01453 else{
01454 bool insync = false;
01455 bool exception_caught = false;
01456 while(!insync){
01457 try{
01458 ret = subs_[i].rcv(master_message_trr_,false);
01459 }
01460 catch(evf::Exception &e)
01461 {
01462 std::cout << "exception in msgrcv on " << i
01463 << " " << subs_[i].alive() << " " << strerror(errno) << std::endl;
01464 exception_caught = true;
01465 break;
01466
01467 }
01468 if(ret==MSQS_MESSAGE_TYPE_TRR) {
01469 TriggerReportStatic *trp = (TriggerReportStatic *)master_message_trr_->mtext;
01470 if(trp->lumiSection >= evtProcessor_.getLumiSectionReferenceIndex()){
01471 insync = true;
01472 }
01473 }
01474 }
01475 if(exception_caught) continue;
01476 }
01477 msgCount++;
01478 if(ret==MSQS_MESSAGE_TYPE_TRR) {
01479 TriggerReportStatic *trp = (TriggerReportStatic *)master_message_trr_->mtext;
01480 if(trp->lumiSection > evtProcessor_.getLumiSectionReferenceIndex()){
01481 std::cout << "postpone handling of msg from slot " << i << " with Ls " << trp->lumiSection
01482 << " should be " << evtProcessor_.getLumiSectionReferenceIndex() << std::endl;
01483 subs_[i].add_postponed_trigger_update(master_message_trr_);
01484 }else{
01485 atLeastOneProcessUpdatedSuccessfully = true;
01486 evtProcessor_.sumAndPackTriggerReport(master_message_trr_);
01487 }
01488 }
01489 else std::cout << "msgrcv returned error " << errno << std::endl;
01490 }
01491 }
01492 if(atLeastOneProcessUpdatedSuccessfully){
01493 nbSubProcessesReporting_.value_ = msgCount;
01494 evtProcessor_.getPackedTriggerReportAsStruct()->nbExpected = nbSubProcesses_.value_;
01495 evtProcessor_.getPackedTriggerReportAsStruct()->nbReporting = nbSubProcessesReporting_.value_;
01496 evtProcessor_.updateRollingReport();
01497 evtProcessor_.fireScalersUpdate();
01498 }
01499 else{
01500 LOG4CPLUS_WARN(getApplicationLogger(),"Summarize loop: no process updated successfully - sleep 10 seconds before trying again");
01501 if(msgCount==0) evtProcessor_.withdrawLumiSectionIncrement();
01502 nbSubProcessesReporting_.value_ = 0;
01503 ::sleep(10);
01504 }
01505 if(fsm_.stateName()->toString()!="Enabled"){
01506 wlScalersActive_ = false;
01507 return false;
01508 }
01509
01510 if(iDieStatisticsGathering_.value_){
01511 try{
01512 TriggerReportStatic *trsp = evtProcessor_.getPackedTriggerReportAsStruct();
01513 cpustat_ ->setNproc(trsp->eventSummary.totalEvents);
01514 cpustat_ ->sendStat(evtProcessor_.getLumiSectionReferenceIndex());
01515 ratestat_->sendStat((unsigned char*)trsp,
01516 sizeof(TriggerReportStatic),
01517 evtProcessor_.getLumiSectionReferenceIndex());
01518 }catch(evf::Exception &e){
01519 LOG4CPLUS_INFO(getApplicationLogger(),"coud not send statistics"
01520 << e.what());
01521 }
01522 }
01523 cpustat_->reset();
01524 return true;
01525 }
01526
01527
01528
01529 bool FUEventProcessor::receivingAndMonitor(toolbox::task::WorkLoop *)
01530 {
01531 try{
01532 myProcess_->rcvSlave(slave_message_monitoring_,true);
01533 switch(slave_message_monitoring_->mtype)
01534 {
01535 case MSQM_MESSAGE_TYPE_MCS:
01536 {
01537 xgi::Input *in = 0;
01538 xgi::Output out;
01539 evtProcessor_.microState(in,&out);
01540 MsgBuf msg1(out.str().size(),MSQS_MESSAGE_TYPE_MCR);
01541 strncpy(msg1->mtext,out.str().c_str(),out.str().size());
01542 myProcess_->postSlave(msg1,true);
01543 break;
01544 }
01545
01546 case MSQM_MESSAGE_TYPE_PRG:
01547 {
01548 xdata::Serializable *dqmp = 0;
01549 xdata::UnsignedInteger32 *dqm = 0;
01550 evtProcessor_.monitoring(0);
01551 try{
01552 dqmp = applicationInfoSpace_-> find("nbDqmUpdates");
01553 } catch(xdata::exception::Exception e){}
01554 if(dqmp!=0)
01555 dqm = (xdata::UnsignedInteger32*)dqmp;
01556
01557
01558 prg * data = (prg*)slave_message_prr_->mtext;
01559 data->ls = evtProcessor_.lsid_;
01560 data->eols = evtProcessor_.lastLumiUsingEol_;
01561 data->ps = evtProcessor_.psid_;
01562 data->nbp = evtProcessor_->totalEvents();
01563 data->nba = evtProcessor_->totalEventsPassed();
01564 data->Ms = evtProcessor_.epMAltState_.value_;
01565 data->ms = evtProcessor_.epmAltState_.value_;
01566 if(dqm) data->dqm = dqm->value_; else data->dqm = 0;
01567 data->trp = scalersUpdates_;
01568
01569 myProcess_->postSlave(slave_message_prr_,true);
01570 if(exitOnError_.value_)
01571 {
01572
01573
01574 if(data->Ms == edm::event_processor::sStopping || data->Ms == edm::event_processor::sError)
01575 {
01576 bool running = true;
01577 int count = 0;
01578 while(running){
01579 int retval = pthread_mutex_lock(&stop_lock_);
01580 if(retval != 0) perror("error");
01581 running = fsm_.stateName()->toString()=="Enabled";
01582 if(count>5) _exit(-1);
01583 pthread_mutex_unlock(&stop_lock_);
01584 if(running) {::sleep(1); count++;}
01585 }
01586 }
01587
01588 }
01589
01590 break;
01591 }
01592 case MSQM_MESSAGE_TYPE_WEB:
01593 {
01594 xgi::Input *in = 0;
01595 xgi::Output out;
01596 unsigned int bytesToSend = 0;
01597 MsgBuf msg1(NUMERIC_MESSAGE_SIZE,MSQS_MESSAGE_TYPE_WEB);
01598 std::string query = slave_message_monitoring_->mtext;
01599 size_t pos = query.find_first_of("&");
01600 std::string method;
01601 std::string args;
01602 if(pos!=std::string::npos)
01603 {
01604 method = query.substr(0,pos);
01605 args = query.substr(pos+1,query.length()-pos-1);
01606 }
01607 else
01608 method=query;
01609
01610 if(method=="Spotlight")
01611 {
01612 spotlightWebPage(in,&out);
01613 }
01614 else if(method=="procStat")
01615 {
01616 procStat(in,&out);
01617 }
01618 else if(method=="moduleWeb")
01619 {
01620 internal::MyCgi mycgi;
01621 boost::char_separator<char> sep(";");
01622 boost::tokenizer<boost::char_separator<char> > tokens(args, sep);
01623 for (boost::tokenizer<boost::char_separator<char> >::iterator tok_iter = tokens.begin();
01624 tok_iter != tokens.end(); ++tok_iter){
01625 size_t pos = (*tok_iter).find_first_of("%");
01626 if(pos != std::string::npos){
01627 std::string first = (*tok_iter).substr(0 , pos);
01628 std::string second = (*tok_iter).substr(pos+1, (*tok_iter).length()-pos-1);
01629 mycgi.getEnvironment()[first]=second;
01630 }
01631 }
01632 moduleWeb(&mycgi,&out);
01633 }
01634 else if(method=="Default")
01635 {
01636 defaultWebPage(in,&out);
01637 }
01638 else
01639 {
01640 out << "Error 404!!!!!!!!" << std::endl;
01641 }
01642
01643
01644 bytesToSend = out.str().size();
01645 unsigned int cycle = 0;
01646 if(bytesToSend==0)
01647 {
01648 snprintf(msg1->mtext, NUMERIC_MESSAGE_SIZE, "%d", bytesToSend);
01649 myProcess_->postSlave(msg1,true);
01650 }
01651 while(bytesToSend !=0){
01652 unsigned int msgSize = bytesToSend>MAX_PIPE_BUFFER_SIZE ? MAX_PIPE_BUFFER_SIZE : bytesToSend;
01653 write(anonymousPipe_[PIPE_WRITE],
01654 out.str().c_str()+MAX_PIPE_BUFFER_SIZE*cycle,
01655 msgSize);
01656 snprintf(msg1->mtext, NUMERIC_MESSAGE_SIZE, "%d", msgSize);
01657 myProcess_->postSlave(msg1,true);
01658 bytesToSend -= msgSize;
01659 cycle++;
01660 }
01661 break;
01662 }
01663 case MSQM_MESSAGE_TYPE_TRP:
01664 {
01665 break;
01666 }
01667 }
01668 }
01669 catch(evf::Exception &e){std::cout << "exception caught in recevingM: " << e.what() << std::endl;}
01670 return true;
01671 }
01672
01673
01674 bool FUEventProcessor::enableCommon()
01675 {
01676 try {
01677 if(hasShMem_) attachDqmToShm();
01678 int sc = 0;
01679 evtProcessor_->clearCounters();
01680 if(isRunNumberSetter_)
01681 evtProcessor_->setRunNumber(runNumber_.value_);
01682 else
01683 evtProcessor_->declareRunNumber(runNumber_.value_);
01684
01685 try{
01686 ::sleep(1);
01687 evtProcessor_->runAsync();
01688 sc = evtProcessor_->statusAsync();
01689 }
01690 catch(cms::Exception &e) {
01691 reasonForFailedState_ = e.explainSelf();
01692 fsm_.fireFailed(reasonForFailedState_,this);
01693 localLog(reasonForFailedState_);
01694 return false;
01695 }
01696 catch(std::exception &e) {
01697 reasonForFailedState_ = e.what();
01698 fsm_.fireFailed(reasonForFailedState_,this);
01699 localLog(reasonForFailedState_);
01700 return false;
01701 }
01702 catch(...) {
01703 reasonForFailedState_ = "Unknown Exception";
01704 fsm_.fireFailed(reasonForFailedState_,this);
01705 localLog(reasonForFailedState_);
01706 return false;
01707 }
01708 if(sc != 0) {
01709 std::ostringstream oss;
01710 oss<<"EventProcessor::runAsync returned status code " << sc;
01711 reasonForFailedState_ = oss.str();
01712 fsm_.fireFailed(reasonForFailedState_,this);
01713 localLog(reasonForFailedState_);
01714 return false;
01715 }
01716 }
01717 catch (xcept::Exception &e) {
01718 reasonForFailedState_ = "enabling FAILED: " + (std::string)e.what();
01719 fsm_.fireFailed(reasonForFailedState_,this);
01720 localLog(reasonForFailedState_);
01721 return false;
01722 }
01723 try{
01724 fsm_.fireEvent("EnableDone",this);
01725 }
01726 catch (xcept::Exception &e) {
01727 std::cout << "exception " << (std::string)e.what() << std::endl;
01728 throw;
01729 }
01730
01731 return false;
01732 }
01733
01734 void FUEventProcessor::forkProcessFromEDM_helper(void * addr) {
01735 ((FUEventProcessor*)addr)->forkProcessesFromEDM();
01736 }
01737
01738 void FUEventProcessor::forkProcessesFromEDM() {
01739
01740 moduleweb::ForkParams * forkParams = &(forkInfoObj_->forkParams);
01741 unsigned int forkFrom=0;
01742 unsigned int forkTo=nbSubProcesses_.value_;
01743 if (forkParams->slotId>=0) {
01744 forkFrom=forkParams->slotId;
01745 forkTo=forkParams->slotId+1;
01746 }
01747
01748
01749 try {
01750 if (sorRef_) {
01751 std::vector<edm::FUShmOutputModule *> shmOutputs = sorRef_->getShmOutputModules();
01752 for (unsigned int i=0;i<shmOutputs.size();i++) {
01753
01754 shmOutputs[i]->unregisterFromShm();
01755
01756 shmOutputs[i]->stop();
01757 }
01758 }
01759 }
01760 catch (std::exception &e)
01761 {
01762 reasonForFailedState_ = (std::string)"Thrown exception while disconnecting ShmOutputModule from Shm: " + e.what();
01763 LOG4CPLUS_ERROR(getApplicationLogger(),reasonForFailedState_);
01764 fsm_.fireFailed(reasonForFailedState_,this);
01765 localLog(reasonForFailedState_);
01766 }
01767 catch (...) {
01768 reasonForFailedState_ = "Thrown unknown exception while disconnecting ShmOutputModule from Shm: ";
01769 LOG4CPLUS_ERROR(getApplicationLogger(),reasonForFailedState_);
01770 fsm_.fireFailed(reasonForFailedState_,this);
01771 localLog(reasonForFailedState_);
01772 }
01773
01774
01775 for(unsigned int i=forkFrom; i<forkTo; i++)
01776 {
01777
01778 int retval = subs_[i].forkNew();
01779 if(retval==0)
01780 {
01781
01782 forkParams->isMaster=0;
01783 myProcess_ = &subs_[i];
01784
01785 delete toolbox::mem::_s_mutex_ptr_;
01786 toolbox::mem::_s_mutex_ptr_ = new toolbox::BSem(toolbox::BSem::FULL,true);
01787 int retval = pthread_mutex_destroy(&stop_lock_);
01788 if(retval != 0) perror("error");
01789 retval = pthread_mutex_init(&stop_lock_,0);
01790 if(retval != 0) perror("error");
01791 fsm_.disableRcmsStateNotification();
01792
01793 try{
01794 LOG4CPLUS_DEBUG(getApplicationLogger(),
01795 "Trying to create message service presence ");
01796
01797 edm::PresenceFactory *pf = edm::PresenceFactory::get();
01798 if(pf != 0) {
01799 pf->makePresence("MessageServicePresence").release();
01800 }
01801 else {
01802 LOG4CPLUS_ERROR(getApplicationLogger(),
01803 "Unable to create message service presence ");
01804 }
01805 }
01806 catch(...) {
01807 LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception in MessageServicePresence");
01808 }
01809
01810 ML::MLlog4cplus::setAppl(this);
01811
01812
01813 try {
01814 if (sorRef_) {
01815 std::vector<edm::FUShmOutputModule *> shmOutputs = sorRef_->getShmOutputModules();
01816 for (unsigned int i=0;i<shmOutputs.size();i++)
01817 shmOutputs[i]->start();
01818 }
01819 }
01820 catch (...)
01821 {
01822 LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception (ShmOutputModule sending InitMsg (pid:"<<getpid() <<")");
01823 }
01824
01825 if (forkParams->restart) {
01826
01827 scalersUpdates_ = 0;
01828 fsm_.fireEvent("Stop",this);
01829 fsm_.fireEvent("StopDone",this);
01830 fsm_.fireEvent("Enable",this);
01831 try{
01832 xdata::Serializable *lsid = applicationInfoSpace_->find("lumiSectionIndex");
01833 if(lsid) {
01834 ((xdata::UnsignedInteger32*)(lsid))->value_--;
01835 }
01836 }
01837 catch(...){
01838 std::cout << "trouble with lsindex during restart" << std::endl;
01839 }
01840 try{
01841 xdata::Serializable *lstb = applicationInfoSpace_->find("lsToBeRecovered");
01842 if(lstb) {
01843 ((xdata::Boolean*)(lstb))->value_ = false;
01844 }
01845 }
01846 catch(...){
01847 std::cout << "trouble with resetting flag for eol recovery " << std::endl;
01848 }
01849 evtProcessor_.adjustLsIndexForRestart();
01850 evtProcessor_.resetPackedTriggerReport();
01851 }
01852
01853
01854 startReceivingLoop();
01855 startReceivingMonitorLoop();
01856 startScalersWorkLoop();
01857 while(!evtProcessor_.isWaitingForLs())
01858 ::usleep(100000);
01859
01860
01861 if(hasShMem_) attachDqmToShm();
01862
01863
01864 try {
01865 fsm_.fireEvent("EnableDone",this);
01866 }
01867 catch (...) {}
01868
01869 return ;
01870 }
01871 else {
01872 forkParams->isMaster=1;
01873 forkInfoObj_->forkParams.slotId=-1;
01874 forkInfoObj_->forkParams.restart=0;
01875 if (forkParams->restart) {
01876 std::ostringstream ost1;
01877 ost1 << "-I- New Process " << retval << " forked for slot " << forkParams->slotId;
01878 localLog(ost1.str());
01879 }
01880 }
01881 }
01882 edm_init_done_=true;
01883 }
01884
01885 bool FUEventProcessor::enableForkInEDM()
01886 {
01887 evtProcessor_.resetWaiting();
01888 try {
01889
01890
01891
01892 int sc = 0;
01893
01894 evtProcessor_->clearCounters();
01895 if(isRunNumberSetter_)
01896 evtProcessor_->setRunNumber(runNumber_.value_);
01897 else
01898 evtProcessor_->declareRunNumber(runNumber_.value_);
01899
01900
01901 pthread_mutex_destroy(&forkObjLock_);
01902 pthread_mutex_init(&forkObjLock_,0);
01903 if (forkInfoObj_) delete forkInfoObj_;
01904 forkInfoObj_ = new moduleweb::ForkInfoObj();
01905 forkInfoObj_->mst_lock_=&forkObjLock_;
01906 forkInfoObj_->fuAddr=(void*)this;
01907 forkInfoObj_->forkHandler = forkProcessFromEDM_helper;
01908 forkInfoObj_->forkParams.slotId=-1;
01909 forkInfoObj_->forkParams.restart=0;
01910 forkInfoObj_->forkParams.isMaster=-1;
01911 forkInfoObj_->stopCondition=0;
01912 if (mwrRef_)
01913 mwrRef_->publishForkInfo(std::string("DaqSource"),forkInfoObj_);
01914
01915 evtProcessor_->runAsync();
01916 sc = evtProcessor_->statusAsync();
01917
01918 if(sc != 0) {
01919 std::ostringstream oss;
01920 oss<<"EventProcessor::runAsync returned status code " << sc;
01921 reasonForFailedState_ = oss.str();
01922 fsm_.fireFailed(reasonForFailedState_,this);
01923 LOG4CPLUS_FATAL(log_,reasonForFailedState_);
01924 return false;
01925 }
01926 }
01927
01928 catch(cms::Exception &e) {
01929 reasonForFailedState_ = e.explainSelf();
01930 fsm_.fireFailed(reasonForFailedState_,this);
01931 LOG4CPLUS_FATAL(log_,reasonForFailedState_);
01932 return false;
01933 }
01934 catch(std::exception &e) {
01935 reasonForFailedState_ = e.what();
01936 fsm_.fireFailed(reasonForFailedState_,this);
01937 LOG4CPLUS_FATAL(log_,reasonForFailedState_);
01938 return false;
01939 }
01940 catch(...) {
01941 reasonForFailedState_ = "Unknown Exception";
01942 fsm_.fireFailed(reasonForFailedState_,this);
01943 LOG4CPLUS_FATAL(log_,reasonForFailedState_);
01944 return false;
01945 }
01946 return true;
01947 }
01948
01949 bool FUEventProcessor::restartForkInEDM(unsigned int slotId) {
01950
01951
01952 forkInfoObj_->lock();
01953 forkInfoObj_->forkParams.slotId=slotId;
01954 forkInfoObj_->forkParams.restart=true;
01955 forkInfoObj_->forkParams.isMaster=1;
01956 forkInfoObj_->stopCondition=0;
01957 LOG4CPLUS_DEBUG(log_, " restarting subprocess in slot "<< slotId <<": posting on control semaphore");
01958 sem_post(forkInfoObj_->control_sem_);
01959 forkInfoObj_->unlock();
01960 usleep(1000);
01961 return true;
01962 }
01963
01964 bool FUEventProcessor::enableClassic()
01965 {
01966 bool retval = enableCommon();
01967 while(evtProcessor_->getState()!= edm::event_processor::sRunning){
01968 LOG4CPLUS_INFO(getApplicationLogger(),"waiting for edm::EventProcessor to start before enabling watchdog");
01969 ::sleep(1);
01970 }
01971
01972
01973
01974 localLog("-I- Start completed");
01975 return retval;
01976 }
01977 bool FUEventProcessor::enableMPEPSlave()
01978 {
01979
01980
01981 startReceivingLoop();
01982 startReceivingMonitorLoop();
01983 evtProcessor_.resetWaiting();
01984 startScalersWorkLoop();
01985 while(!evtProcessor_.isWaitingForLs())
01986 ::sleep(1);
01987
01988
01989
01990 try{
01991
01992 try{
01993 LOG4CPLUS_DEBUG(getApplicationLogger(),
01994 "Trying to create message service presence ");
01995 edm::PresenceFactory *pf = edm::PresenceFactory::get();
01996 if(pf != 0) {
01997 pf->makePresence("MessageServicePresence").release();
01998 }
01999 else {
02000 LOG4CPLUS_ERROR(getApplicationLogger(),
02001 "Unable to create message service presence ");
02002 }
02003 }
02004 catch(...) {
02005 LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception");
02006 }
02007 ML::MLlog4cplus::setAppl(this);
02008 }
02009 catch (xcept::Exception &e) {
02010 reasonForFailedState_ = "enabling FAILED: " + (std::string)e.what();
02011 fsm_.fireFailed(reasonForFailedState_,this);
02012 localLog(reasonForFailedState_);
02013 }
02014 bool retval = enableCommon();
02015
02016
02017
02018
02019 return retval;
02020 }
02021
02022 bool FUEventProcessor::stopClassic()
02023 {
02024 try {
02025 LOG4CPLUS_INFO(getApplicationLogger(),"Start stopping :) ...");
02026 edm::EventProcessor::StatusCode rc = evtProcessor_.stop();
02027 if(rc == edm::EventProcessor::epSuccess)
02028 fsm_.fireEvent("StopDone",this);
02029 else
02030 {
02031
02032 if(rc == edm::EventProcessor::epTimedOut)
02033 reasonForFailedState_ = "EventProcessor stop timed out";
02034 else
02035 reasonForFailedState_ = "EventProcessor did not receive STOP event";
02036 fsm_.fireFailed(reasonForFailedState_,this);
02037 localLog(reasonForFailedState_);
02038 }
02039 if(hasShMem_) detachDqmFromShm();
02040 }
02041 catch (xcept::Exception &e) {
02042 reasonForFailedState_ = "stopping FAILED: " + (std::string)e.what();
02043 localLog(reasonForFailedState_);
02044 fsm_.fireFailed(reasonForFailedState_,this);
02045 }
02046 LOG4CPLUS_INFO(getApplicationLogger(),"Finished stopping!");
02047 localLog("-I- Stop completed");
02048 return false;
02049 }
02050
02051 void FUEventProcessor::stopSlavesAndAcknowledge()
02052 {
02053 MsgBuf msg(0,MSQM_MESSAGE_TYPE_STOP);
02054 MsgBuf msg1(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_STOP);
02055
02056 std::vector<bool> processes_to_stop(nbSubProcesses_.value_,false);
02057 for(unsigned int i = 0; i < subs_.size(); i++)
02058 {
02059 pthread_mutex_lock(&stop_lock_);
02060 if(subs_[i].alive()>0){
02061 processes_to_stop[i] = true;
02062 subs_[i].post(msg,false);
02063 }
02064 pthread_mutex_unlock(&stop_lock_);
02065 }
02066 for(unsigned int i = 0; i < subs_.size(); i++)
02067 {
02068 pthread_mutex_lock(&stop_lock_);
02069 if(processes_to_stop[i]){
02070 try{
02071 subs_[i].rcv(msg1,false);
02072 }
02073 catch(evf::Exception &e){
02074 std::ostringstream ost;
02075 ost << "failed to get STOP - errno ->" << errno << " " << e.what();
02076 reasonForFailedState_ = ost.str();
02077 LOG4CPLUS_ERROR(getApplicationLogger(),reasonForFailedState_);
02078
02079 localLog(reasonForFailedState_);
02080 pthread_mutex_unlock(&stop_lock_);
02081 continue;
02082 }
02083 }
02084 else {
02085 pthread_mutex_unlock(&stop_lock_);
02086 continue;
02087 }
02088 pthread_mutex_unlock(&stop_lock_);
02089 if(msg1->mtype==MSQS_MESSAGE_TYPE_STOP)
02090 while(subs_[i].alive()>0) ::usleep(10000);
02091 subs_[i].disconnect();
02092 }
02093
02094
02095 }
02096
02097 void FUEventProcessor::microState(xgi::Input *in,xgi::Output *out)
02098 {
02099 std::string urn = getApplicationDescriptor()->getURN();
02100 try{
02101 evtProcessor_.stateNameFromIndex(0);
02102 evtProcessor_.moduleNameFromIndex(0);
02103 if(myProcess_) {std::cout << "microstate called for child! bail out" << std::endl; return;}
02104 *out << "<tr><td>" << fsm_.stateName()->toString()
02105 << "</td><td>"<< (myProcess_ ? "S" : "M") <<"</td><td>" << nblive_ << "</td><td>"
02106 << nbdead_ << "</td><td><a href=\"/" << urn << "/procStat\">" << getpid() <<"</a></td>";
02107 evtProcessor_.microState(in,out);
02108 *out << "<td></td><td>" << nbTotalDQM_
02109 << "</td><td>" << evtProcessor_.getScalersUpdates() << "</td></tr>";
02110 if(nbSubProcesses_.value_!=0 && !myProcess_)
02111 {
02112 pthread_mutex_lock(&start_lock_);
02113 for(unsigned int i = 0; i < subs_.size(); i++)
02114 {
02115 try{
02116 if(subs_[i].alive()>0)
02117 {
02118 *out << "<tr><td bgcolor=\"#00FF00\" id=\"a"
02119 << i << "\">""Alive</td><td>S</td><td>"
02120 << subs_[i].queueId() << "<td>"
02121 << subs_[i].queueStatus()<< "/"
02122 << subs_[i].queueOccupancy() << "/"
02123 << subs_[i].queuePidOfLastSend() << "/"
02124 << subs_[i].queuePidOfLastReceive()
02125 << "</td><td><a id=\"p"<< i << "\" href=\"SubWeb?process="
02126 << subs_[i].pid() << "&method=procStat\">"
02127 << subs_[i].pid()<<"</a></td>"
02128 << "<td>" << evtProcessor_.stateNameFromIndex(subs_[i].params().Ms) << "</td><td>"
02129 << evtProcessor_.moduleNameFromIndex(subs_[i].params().ms) << "</td><td>"
02130 << subs_[i].params().nba << "/" << subs_[i].params().nbp
02131 << " (" << float(subs_[i].params().nba)/float(subs_[i].params().nbp)*100. <<"%)"
02132 << "</td><td>" << subs_[i].params().ls << "/" << subs_[i].params().ls
02133 << "</td><td>" << subs_[i].params().ps
02134 << "</td><td"
02135 << ((subs_[i].params().eols<subs_[i].params().ls) ? " bgcolor=\"#00FF00\"" : " bgcolor=\"#FF0000\"") << ">"
02136 << subs_[i].params().eols
02137 << "</td><td>" << subs_[i].params().dqm
02138 << "</td><td>" << subs_[i].params().trp << "</td>";
02139 }
02140 else
02141 {
02142 pthread_mutex_lock(&pickup_lock_);
02143 *out << "<tr><td id=\"a"<< i << "\" ";
02144 if(subs_[i].alive()==-1000)
02145 *out << " bgcolor=\"#bbaabb\">NotInitialized";
02146 else
02147 *out << (subs_[i].alive()==0 ? ">Done" : " bgcolor=\"#FF0000\">Dead");
02148 *out << "</td><td>S</td><td>"<< subs_[i].queueId() << "<td>"
02149 << subs_[i].queueStatus() << "/"
02150 << subs_[i].queueOccupancy() << "/"
02151 << subs_[i].queuePidOfLastSend() << "/"
02152 << subs_[i].queuePidOfLastReceive()
02153 << "</td><td id=\"p"<< i << "\">"
02154 <<subs_[i].pid()<<"</td><td colspan=\"5\">" << subs_[i].reasonForFailed();
02155 if(subs_[i].alive()!=0 && subs_[i].alive()!=-1000)
02156 {
02157 if(autoRestartSlaves_ && subs_[i].restartCount()<=2)
02158 *out << " will restart in " << subs_[i].countdown() << " s";
02159 else if(autoRestartSlaves_)
02160 *out << " reached maximum restart count";
02161 else *out << " autoRestart is disabled ";
02162 }
02163 *out << "</td><td"
02164 << ((subs_[i].params().eols<subs_[i].params().ls) ?
02165 " bgcolor=\"#00FF00\"" : " bgcolor=\"#FF0000\"")
02166 << ">"
02167 << subs_[i].params().eols
02168 << "</td><td>" << subs_[i].params().dqm
02169 << "</td><td>" << subs_[i].params().trp << "</td>";
02170 pthread_mutex_unlock(&pickup_lock_);
02171 }
02172 *out << "</tr>";
02173 }
02174 catch(evf::Exception &e){
02175 *out << "<tr><td id=\"a"<< i << "\" "
02176 <<"bgcolor=\"#FFFF00\">NotResp</td><td>S</td><td>"<< subs_[i].queueId() << "<td>"
02177 << subs_[i].queueStatus() << "/"
02178 << subs_[i].queueOccupancy() << "/"
02179 << subs_[i].queuePidOfLastSend() << "/"
02180 << subs_[i].queuePidOfLastReceive()
02181 << "</td><td id=\"p"<< i << "\">"
02182 <<subs_[i].pid()<<"</td>";
02183 }
02184 }
02185 pthread_mutex_unlock(&start_lock_);
02186 }
02187 }
02188 catch(evf::Exception &e)
02189 {
02190 LOG4CPLUS_INFO(getApplicationLogger(),"evf::Exception caught in microstate - " << e.what());
02191 }
02192 catch(cms::Exception &e)
02193 {
02194 LOG4CPLUS_INFO(getApplicationLogger(),"cms::Exception caught in microstate - " << e.what());
02195 }
02196 catch(std::exception &e)
02197 {
02198 LOG4CPLUS_INFO(getApplicationLogger(),"std::Exception caught in microstate - " << e.what());
02199 }
02200 catch(...)
02201 {
02202 LOG4CPLUS_INFO(getApplicationLogger(),"unknown exception caught in microstate - ");
02203 }
02204
02205 }
02206
02207
02208 void FUEventProcessor::updater(xgi::Input *in,xgi::Output *out)
02209 {
02210 using namespace utils;
02211
02212 *out << updaterStatic_;
02213 mDiv(out,"loads");
02214 uptime(out);
02215 cDiv(out);
02216 mDiv(out,"st",fsm_.stateName()->toString());
02217 mDiv(out,"ru",runNumber_.toString());
02218 mDiv(out,"nsl",nbSubProcesses_.value_);
02219 mDiv(out,"nsr",nbSubProcessesReporting_.value_);
02220 mDiv(out,"cl");
02221 *out << getApplicationDescriptor()->getClassName()
02222 << (nbSubProcesses_.value_ > 0 ? "MP " : " ");
02223 cDiv(out);
02224 mDiv(out,"in",getApplicationDescriptor()->getInstance());
02225 if(fsm_.stateName()->toString() != "Halted" && fsm_.stateName()->toString() != "halting"){
02226 mDiv(out,"hlt");
02227 *out << "<a href=\"" << configString_.toString() << "\">HLT Config</a>";
02228 cDiv(out);
02229 *out << std::endl;
02230 }
02231 else
02232 mDiv(out,"hlt","Not yet...");
02233
02234 mDiv(out,"sq",squidPresent_.toString());
02235 mDiv(out,"vwl",(supervising_ ? "Active" : "Not Initialized"));
02236 mDiv(out,"mwl",evtProcessor_.wlMonitoring());
02237 if(nbProcessed != 0 && nbAccepted != 0)
02238 {
02239 mDiv(out,"tt",((xdata::UnsignedInteger32*)nbProcessed)->value_);
02240 mDiv(out,"ac",((xdata::UnsignedInteger32*)nbAccepted)->value_);
02241 }
02242 else
02243 {
02244 mDiv(out,"tt",0);
02245 mDiv(out,"ac",0);
02246 }
02247 if(!myProcess_)
02248 mDiv(out,"swl",(wlSummarizeActive_ ? "Active" : "Inactive"));
02249 else
02250 mDiv(out,"swl",(wlScalersActive_ ? "Active" : "Inactive"));
02251
02252 mDiv(out,"idi",iDieUrl_.value_);
02253 if(vp_!=0){
02254 mDiv(out,"vpi",(unsigned int) vp_);
02255 if(vulture_->hasStarted()>=0)
02256 mDiv(out,"vul","Prowling");
02257 else
02258 mDiv(out,"vul","Dead");
02259 }
02260 else{
02261 mDiv(out,"vul",(vulture_==0 ? "Nope" : "Hatching"));
02262 }
02263 if(evtProcessor_){
02264 mDiv(out,"ll");
02265 *out << evtProcessor_.lastLumi().ls
02266 << "," << evtProcessor_.lastLumi().proc << "," << evtProcessor_.lastLumi().acc;
02267 cDiv(out);
02268 }
02269 mDiv(out,"lg");
02270 for(unsigned int i = logRingIndex_; i<logRingSize_; i++)
02271 *out << logRing_[i] << std::endl;
02272 if(logWrap_)
02273 for(unsigned int i = 0; i<logRingIndex_; i++)
02274 *out << logRing_[i] << std::endl;
02275 cDiv(out);
02276 mDiv(out,"cha");
02277 if(cpustat_) *out << cpustat_->getChart();
02278 cDiv(out);
02279 }
02280
02281 void FUEventProcessor::procStat(xgi::Input *in, xgi::Output *out)
02282 {
02283 evf::utils::procStat(out);
02284 }
02285
02286 void FUEventProcessor::sendMessageOverMonitorQueue(MsgBuf &buf)
02287 {
02288 if(myProcess_) myProcess_->postSlave(buf,true);
02289 }
02290
02291 void FUEventProcessor::makeStaticInfo()
02292 {
02293 using namespace utils;
02294 std::ostringstream ost;
02295 mDiv(&ost,"ve");
02296 ost<< "$Revision: 1.134.2.6 $ (" << edm::getReleaseVersion() <<")";
02297 cDiv(&ost);
02298 mDiv(&ost,"ou",outPut_.toString());
02299 mDiv(&ost,"sh",hasShMem_.toString());
02300 mDiv(&ost,"mw",hasModuleWebRegistry_.toString());
02301 mDiv(&ost,"sw",hasServiceWebRegistry_.toString());
02302
02303 xdata::Serializable *monsleep = 0;
02304 xdata::Serializable *lstimeout = 0;
02305 try{
02306 monsleep = applicationInfoSpace_->find("monSleepSec");
02307 lstimeout = applicationInfoSpace_->find("lsTimeOut");
02308 }
02309 catch(xdata::exception::Exception e){
02310 }
02311
02312 if(monsleep!=0)
02313 mDiv(&ost,"ms",monsleep->toString());
02314 if(lstimeout!=0)
02315 mDiv(&ost,"lst",lstimeout->toString());
02316 char cbuf[sizeof(struct utsname)];
02317 struct utsname* buf = (struct utsname*)cbuf;
02318 uname(buf);
02319 mDiv(&ost,"sysinfo");
02320 ost << buf->sysname << " " << buf->nodename
02321 << " " << buf->release << " " << buf->version << " " << buf->machine;
02322 cDiv(&ost);
02323 updaterStatic_ = ost.str();
02324 }
02325
02326 XDAQ_INSTANTIATOR_IMPL(evf::FUEventProcessor)