00001
00002
00003
00004
00005
00006
00007
00008
00009
00011
00012 #include "EventFilter/ResourceBroker/interface/SharedResources.h"
00013
00014 #include <signal.h>
00015 #include <iostream>
00016
00017 using std::string;
00018 using std::vector;
00019 using std::cout;
00020 using std::endl;
00021
00022 using namespace evf::rb_statemachine;
00023
00024 SharedResources::SharedResources(Logger log) :
00025 wlMonitoring_(0),
00026 asMonitoring_(0),
00027 wlWatching_(0),
00028 asWatching_(0),
00029 wlSendData_(0),
00030 asSendData_(0),
00031 wlSendDqm_(0),
00032 asSendDqm_(0),
00033 wlDiscard_(0),
00034 asDiscard_(0),
00035 gui_(0),
00036 commands_(CommandQueue()),
00037 log_(log),
00038 bu_(0),
00039 sm_(0),
00040 i2oPool_(0),
00041 ipcManager_(0),
00042 resourceStructure_(0),
00043 runNumber_(0),
00044 deltaT_(0.0),
00045 deltaN_(0),
00046 deltaSumOfSquares_(0),
00047 deltaSumOfSizes_(0),
00048 throughput_(0.0),
00049 rate_(0.0),
00050 average_(0.0),
00051 rms_(0.0),
00052 nbAllocatedEvents_(0),
00053 nbPendingRequests_(0),
00054 nbReceivedEvents_(0),
00055 nbSentEvents_(0),
00056 nbSentDqmEvents_(0),
00057 nbSentErrorEvents_(0),
00058 nbPendingSMDiscards_(0),
00059 nbPendingSMDqmDiscards_(0),
00060 nbDiscardedEvents_(0),
00061
00062 nbReceivedEol_(0),
00063 highestEolReceived_(0),
00064 nbEolPosted_(0),
00065 nbEolDiscarded_(0),
00066 nbLostEvents_(0),
00067 nbDataErrors_(0),
00068 nbCrcErrors_(0),
00069 nbTimeoutsWithEvent_(0),
00070 nbTimeoutsWithoutEvent_(0),
00071 dataErrorFlag_(0),
00072 segmentationMode_(false),
00073 useMessageQueueIPC_(false),
00074 nbClients_(0),
00075 clientPrcIds_(""),
00076 nbRawCells_(16),
00077 nbRecoCells_(8),
00078 nbDqmCells_(8),
00079 rawCellSize_(0x400000)
00080 ,
00081 recoCellSize_(0x800000)
00082 ,
00083 dqmCellSize_(0x800000)
00084
00085 , freeResRequiredForAllocate_(-1), doDropEvents_(false),
00086 doFedIdCheck_(true), doCrcCheck_(1), doDumpEvents_(0),
00087 buClassName_("BU"), buInstance_(0), smClassName_("StorageManager"),
00088 smInstance_(0), resourceStructureTimeout_(200000), monSleepSec_(2),
00089 watchSleepSec_(10), timeOutSec_(30), processKillerEnabled_(true),
00090 useEvmBoard_(true), reasonForFailed_(""), nbAllocateSent_(0),
00091 nbTakeReceived_(0), nbDataDiscardReceived_(0),
00092 nbDqmDiscardReceived_(0), nbSentLast_(0), sumOfSquaresLast_(0),
00093 sumOfSizesLast_(0), frb_(0), shmInconsistent_(false),
00094 allowI2ODiscards_(true) {
00095
00096 sem_init(&lock_, 0, 1);
00097 sem_init(&accessToResourceStructureLock_, 0, 1);
00098
00099 }
00100
00101 SharedResources::~SharedResources() {
00102
00103 }
00104
00105
00106 void SharedResources::configureResources(xdaq::Application* app) {
00107
00108 ipcManager_ = new IPCManager(useMessageQueueIPC_);
00109
00110 ipcManager_->initialise(segmentationMode_.value_, nbRawCells_.value_,
00111 nbRecoCells_.value_, nbDqmCells_.value_, rawCellSize_.value_,
00112 recoCellSize_.value_, dqmCellSize_.value_,
00113 freeResRequiredForAllocate_, bu_, sm_, log_,
00114 resourceStructureTimeout_.value_, frb_, app);
00115
00116 resourceStructure_ = ipcManager_->ipc();
00117
00118 FUResource::doFedIdCheck(doFedIdCheck_);
00119 FUResource::useEvmBoard(useEvmBoard_);
00120 resourceStructure_->setDoCrcCheck(doCrcCheck_);
00121 resourceStructure_->setDoDumpEvents(doDumpEvents_);
00122 reset();
00123 shmInconsistent_ = false;
00124
00125
00126 if (resourceStructure_->nbResources() != nbRawCells_.value_
00127 || resourceStructure_->nbFreeSlots() != nbRawCells_.value_)
00128 shmInconsistent_ = true;
00129 }
00130
00131
00132 void SharedResources::reset() {
00133
00134 gui_->resetCounters();
00135
00136 deltaT_ = 0.0;
00137 deltaN_ = 0;
00138 deltaSumOfSquares_ = 0.0;
00139 deltaSumOfSizes_ = 0;
00140
00141 throughput_ = 0.0;
00142 rate_ = 0.0;
00143 average_ = 0.0;
00144 rms_ = 0.0;
00145
00146 nbSentLast_ = 0;
00147 sumOfSquaresLast_ = 0;
00148 sumOfSizesLast_ = 0;
00149 }
00150
00151
00152 void SharedResources::cancelAllWorkloops() {
00153 if (wlSendData_) {
00154 wlSendData_->cancel();
00155 toolbox::task::getWorkLoopFactory()->removeWorkLoop("SendData",
00156 "waiting");
00157 }
00158 if (wlSendDqm_) {
00159 wlSendDqm_->cancel();
00160 toolbox::task::getWorkLoopFactory()->removeWorkLoop("SendDqm",
00161 "waiting");
00162 }
00163 if (wlDiscard_) {
00164 wlDiscard_->cancel();
00165 toolbox::task::getWorkLoopFactory()->removeWorkLoop("Discard",
00166 "waiting");
00167 }
00168
00169 if (wlMonitoring_) {
00170 wlMonitoring_->cancel();
00171 toolbox::task::getWorkLoopFactory()->removeWorkLoop("Monitoring",
00172 "waiting");
00173 }
00174 if (wlWatching_) {
00175 wlWatching_->cancel();
00176 toolbox::task::getWorkLoopFactory()->removeWorkLoop("Watching",
00177 "waiting");
00178 }
00179 }
00180
00181
00182 void SharedResources::startMonitoringWorkLoop() throw (evf::Exception) {
00183
00184 struct timezone timezone;
00185 gettimeofday(&monStartTime_, &timezone);
00186
00187 try {
00188 wlMonitoring_ = toolbox::task::getWorkLoopFactory()->getWorkLoop(
00189 sourceId_ + "Monitoring", "waiting");
00190 if (!wlMonitoring_->isActive())
00191 wlMonitoring_->activate();
00192 asMonitoring_ = toolbox::task::bind(this, &SharedResources::monitoring,
00193 sourceId_ + "Monitoring");
00194 wlMonitoring_->submit(asMonitoring_);
00195 } catch (xcept::Exception& e) {
00196 string msg = "Failed to start workloop 'Monitoring'.";
00197 XCEPT_RETHROW(evf::Exception, msg, e);
00198 }
00199 }
00200
00201
00202 bool SharedResources::monitoring(toolbox::task::WorkLoop*) {
00203
00204 unsigned int nbSent;
00205 uint64_t sumOfSquares;
00206 unsigned int sumOfSizes;
00207 uint64_t deltaSumOfSquares;
00208
00209 lock();
00210 if (0 == resourceStructure_) {
00211 deltaT_.value_ = 0.0;
00212 deltaN_.value_ = 0;
00213 deltaSumOfSquares_.value_ = 0.0;
00214 deltaSumOfSizes_.value_ = 0;
00215 throughput_ = 0.0;
00216 rate_ = 0.0;
00217 average_ = 0.0;
00218 rms_ = 0.0;
00219 unlock();
00220 return false;
00221 } else {
00222 nbSent = resourceStructure_->nbSent();
00223 sumOfSquares = resourceStructure_->sumOfSquares();
00224 sumOfSizes = resourceStructure_->sumOfSizes();
00225 }
00226 unlock();
00227
00228 struct timeval monEndTime;
00229 struct timezone timezone;
00230
00231 gettimeofday(&monEndTime, &timezone);
00232
00233 xdata::getInfoSpaceFactory()->lock();
00234 gui_->monInfoSpace()->lock();
00235
00236 deltaT_.value_ = deltaT(&monStartTime_, &monEndTime);
00237 monStartTime_ = monEndTime;
00238
00239 deltaN_.value_ = nbSent - nbSentLast_;
00240 nbSentLast_ = nbSent;
00241
00242 deltaSumOfSquares = sumOfSquares - sumOfSquaresLast_;
00243 deltaSumOfSquares_.value_ = (double) deltaSumOfSquares;
00244 sumOfSquaresLast_ = sumOfSquares;
00245
00246 deltaSumOfSizes_.value_ = sumOfSizes - sumOfSizesLast_;
00247 sumOfSizesLast_ = sumOfSizes;
00248
00249 if (deltaT_.value_ != 0) {
00250 throughput_ = deltaSumOfSizes_.value_ / deltaT_.value_;
00251 rate_ = deltaN_.value_ / deltaT_.value_;
00252 } else {
00253 throughput_ = 0.0;
00254 rate_ = 0.0;
00255 }
00256
00257 double meanOfSquares, mean, squareOfMean, variance;
00258
00259 if (deltaN_.value_ != 0) {
00260 meanOfSquares = deltaSumOfSquares_.value_ / ((double) (deltaN_.value_));
00261 mean = ((double) (deltaSumOfSizes_.value_))
00262 / ((double) (deltaN_.value_));
00263 squareOfMean = mean * mean;
00264 variance = meanOfSquares - squareOfMean;
00265 if (variance < 0.0)
00266 variance = 0.0;
00267
00268 average_ = deltaSumOfSizes_.value_ / deltaN_.value_;
00269 rms_ = std::sqrt(variance);
00270 } else {
00271 average_ = 0.0;
00272 rms_ = 0.0;
00273 }
00274
00275 gui_->monInfoSpace()->unlock();
00276 xdata::getInfoSpaceFactory()->unlock();
00277
00278 ::sleep(monSleepSec_.value_);
00279
00280 return true;
00281 }
00282
00283
00284 void SharedResources::startWatchingWorkLoop() throw (evf::Exception) {
00285 try {
00286 wlWatching_ = toolbox::task::getWorkLoopFactory()->getWorkLoop(
00287 sourceId_ + "Watching", "waiting");
00288 if (!wlWatching_->isActive())
00289 wlWatching_->activate();
00290 asWatching_ = toolbox::task::bind(this, &SharedResources::watching,
00291 sourceId_ + "Watching");
00292 wlWatching_->submit(asWatching_);
00293 } catch (xcept::Exception& e) {
00294 string msg = "Failed to start workloop 'Watching'.";
00295 XCEPT_RETHROW(evf::Exception, msg, e);
00296 }
00297 }
00298
00299
00300 bool SharedResources::watching(toolbox::task::WorkLoop*) {
00301 lock();
00302 if (0 == resourceStructure_) {
00303 unlock();
00304 return false;
00305 }
00306
00307 vector<pid_t> evt_prcids;
00308 vector<UInt_t> evt_numbers;
00309 vector<time_t> evt_tstamps;
00310 try {
00311 evt_prcids = resourceStructure_->cellPrcIds();
00312 evt_numbers = resourceStructure_->cellEvtNumbers();
00313 evt_tstamps = resourceStructure_->cellTimeStamps();
00314 } catch (evf::Exception& e) {
00315 goToFailedState(e);
00316 }
00317
00318 time_t tcurr = time(0);
00319 for (UInt_t i = 0; i < evt_tstamps.size(); i++) {
00320 pid_t pid = evt_prcids[i];
00321 UInt_t evt = evt_numbers[i];
00322 time_t tstamp = evt_tstamps[i];
00323 if (tstamp == 0)
00324 continue;
00325 double tdiff = difftime(tcurr, tstamp);
00326 if (tdiff > timeOutSec_) {
00327 if (processKillerEnabled_) {
00328
00329 kill(pid, 9);
00330 nbTimeoutsWithEvent_++;
00331 }
00332 LOG4CPLUS_ERROR(
00333 log_,
00334 "evt " << evt << " under processing for more than "
00335 << timeOutSec_ << "sec for process " << pid);
00336
00337 }
00338 }
00339
00340 vector<pid_t> prcids;
00341 try {
00342 #ifdef linux
00343 auto lk = resourceStructure_->lockCrashHandler();
00344 #endif
00345 prcids = resourceStructure_->clientPrcIds();
00346 for (UInt_t i = 0; i < prcids.size(); i++) {
00347 pid_t pid = prcids[i];
00348 int status = kill(pid, 0);
00349 if (status != 0) {
00350 LOG4CPLUS_ERROR(
00351 log_,
00352 "EP prc " << pid
00353 << " died, send to error stream if processing.");
00354 if (!resourceStructure_->handleCrashedEP(runNumber_, pid))
00355 nbTimeoutsWithoutEvent_++;
00356 }
00357 }
00358
00359 } catch (evf::Exception& e) {
00360 goToFailedState(e);
00361 }
00362
00363 try {
00364 if ((resourceStructure_->nbResources() != nbRawCells_.value_)
00365 && !shmInconsistent_) {
00366 std::ostringstream ost;
00367 ost << "Watchdog spotted inconsistency in ResourceTable - nbRaw="
00368 << nbRawCells_.value_ << " but nbResources="
00369 << resourceStructure_->nbResources() << " and nbFreeSlots="
00370 << resourceStructure_->nbFreeSlots();
00371 XCEPT_DECLARE(evf::Exception, sentinelException, ost.str());
00372 fsm_->getApp()->notifyQualified("error", sentinelException);
00373
00374
00375 shmInconsistent_ = true;
00376 }
00377 } catch (evf::Exception& e) {
00378 goToFailedState(e);
00379 }
00380
00381 unlock();
00382
00383 ::sleep(watchSleepSec_.value_);
00384 return true;
00385 }
00386
00387
00388 double SharedResources::deltaT(const struct timeval *start,
00389 const struct timeval *end) {
00390 unsigned int sec;
00391 unsigned int usec;
00392
00393 sec = end->tv_sec - start->tv_sec;
00394
00395 if (end->tv_usec > start->tv_usec) {
00396 usec = end->tv_usec - start->tv_usec;
00397 } else {
00398 sec--;
00399 usec = 1000000 - ((unsigned int) (start->tv_usec - end->tv_usec));
00400 }
00401
00402 return ((double) sec) + ((double) usec) / 1000000.0;
00403 }
00404
00405
00406
00407 void SharedResources::startSendDataWorkLoop() throw (evf::Exception) {
00408 try {
00409 LOG4CPLUS_INFO(log_, "Start 'send data' workloop.");
00410 wlSendData_ = toolbox::task::getWorkLoopFactory()->getWorkLoop(
00411 "SendData", "waiting");
00412 if (!wlSendData_->isActive())
00413 wlSendData_->activate();
00414 asSendData_ = toolbox::task::bind(this, &SharedResources::sendData,
00415 "SendData");
00416 wlSendData_->submit(asSendData_);
00417
00418 } catch (xcept::Exception& e) {
00419 string msg = "Failed to start workloop 'SendData'.";
00420 XCEPT_RETHROW(evf::Exception, msg, e);
00421 }
00422 }
00423
00424
00425 bool SharedResources::sendData(toolbox::task::WorkLoop*) {
00426 int currentStateID = -1;
00427 bool reschedule = true;
00428
00429 fsm_->transitionReadLock();
00430 currentStateID = fsm_->getCurrentState().stateID();
00431 fsm_->transitionUnlock();
00432
00433 try {
00434 switch (currentStateID) {
00435 case rb_statemachine::RUNNING:
00436 reschedule = resourceStructure_->sendData();
00437 break;
00438 case rb_statemachine::STOPPING:
00439 reschedule = resourceStructure_->sendData();
00440 break;
00441 case rb_statemachine::HALTING:
00442 reschedule = resourceStructure_->sendDataWhileHalting();
00443 break;
00444 case rb_statemachine::FAILED:
00445
00446 return false;
00447 default:
00448 cout << "RBStateMachine: current state: " << currentStateID
00449 << " does not support action: >>sendData<<" << endl;
00450 ::usleep(50000);
00451 reschedule = true;
00452 }
00453 } catch (evf::Exception& e) {
00454 goToFailedState(e);
00455 }
00456
00457 return reschedule;
00458 }
00459
00460
00461
00462 void SharedResources::startSendDqmWorkLoop() throw (evf::Exception) {
00463 try {
00464 LOG4CPLUS_INFO(log_, "Start 'send dqm' workloop.");
00465 wlSendDqm_ = toolbox::task::getWorkLoopFactory()->getWorkLoop(
00466 "SendDqm", "waiting");
00467 if (!wlSendDqm_->isActive())
00468 wlSendDqm_->activate();
00469 asSendDqm_ = toolbox::task::bind(this, &SharedResources::sendDqm,
00470 "SendDqm");
00471 wlSendDqm_->submit(asSendDqm_);
00472
00473 } catch (xcept::Exception& e) {
00474 string msg = "Failed to start workloop 'SendDqm'.";
00475 XCEPT_RETHROW(evf::Exception, msg, e);
00476 }
00477 }
00478
00479
00480 bool SharedResources::sendDqm(toolbox::task::WorkLoop*) {
00481 int currentStateID = -1;
00482 bool reschedule = true;
00483
00484 fsm_->transitionReadLock();
00485 currentStateID = fsm_->getCurrentState().stateID();
00486 fsm_->transitionUnlock();
00487
00488 try {
00489 switch (currentStateID) {
00490 case rb_statemachine::RUNNING:
00491 reschedule = resourceStructure_->sendDqm();
00492 break;
00493 case rb_statemachine::STOPPING:
00494 reschedule = resourceStructure_->sendDqm();
00495 break;
00496 case rb_statemachine::HALTING:
00497 reschedule = resourceStructure_->sendDqmWhileHalting();
00498 break;
00499 case rb_statemachine::FAILED:
00500
00501 return false;
00502 default:
00503 cout << "RBStateMachine: current state: " << currentStateID
00504 << " does not support action: >>sendDqm<<" << endl;
00505 ::usleep(50000);
00506 reschedule = true;
00507 }
00508 } catch (evf::Exception& e) {
00509 goToFailedState(e);
00510 }
00511
00512 return reschedule;
00513 }
00514
00515
00516
00517 void SharedResources::startDiscardWorkLoop() throw (evf::Exception) {
00518 try {
00519 LOG4CPLUS_INFO(log_, "Start 'discard' workloop.");
00520 wlDiscard_ = toolbox::task::getWorkLoopFactory()->getWorkLoop(
00521 "Discard", "waiting");
00522 if (!wlDiscard_->isActive())
00523 wlDiscard_->activate();
00524 asDiscard_ = toolbox::task::bind(this, &SharedResources::discard,
00525 "Discard");
00526 wlDiscard_->submit(asDiscard_);
00527 resourceStructure_->setActive(true);
00528
00529 } catch (xcept::Exception& e) {
00530 string msg = "Failed to start workloop 'Discard'.";
00531 XCEPT_RETHROW(evf::Exception, msg, e);
00532 }
00533 resourceStructure_->setReadyToShutDown(false);
00534 }
00535
00536
00537 bool SharedResources::discard(toolbox::task::WorkLoop*) {
00538 int currentStateID = -1;
00539 bool reschedule = true;
00540
00541 fsm_->transitionReadLock();
00542 currentStateID = fsm_->getCurrentState().stateID();
00543 fsm_->transitionUnlock();
00544 try {
00545 switch (currentStateID) {
00546 case rb_statemachine::RUNNING:
00547 reschedule = resourceStructure_->discard();
00548 break;
00549 case rb_statemachine::STOPPING:
00550
00551 reschedule = resourceStructure_->discardWhileHalting(true);
00552 break;
00553 case rb_statemachine::HALTING:
00554
00555 reschedule = resourceStructure_->discardWhileHalting(false);
00556 break;
00557 case rb_statemachine::FAILED:
00558
00559 return false;
00560 default:
00561 cout << "RBStateMachine: current state: " << currentStateID
00562 << " does not support action: >>discard<<" << endl;
00563 ::usleep(50000);
00564 reschedule = true;
00565 }
00566 } catch (evf::Exception& e) {
00567 goToFailedState(e);
00568 }
00569
00570 return reschedule;
00571 }
00572
00573
00574 void SharedResources::printWorkLoopStatus() {
00575 cout << "Workloop status===============" << endl;
00576 cout << "==============================" << endl;
00577 if (wlSendData_ != 0)
00578 cout << "SendData -> " << wlSendData_->isActive() << endl;
00579 if (wlSendDqm_ != 0)
00580 cout << "SendDqm -> " << wlSendDqm_->isActive() << endl;
00581 if (wlDiscard_ != 0)
00582 cout << "Discard -> " << wlDiscard_->isActive() << endl;
00583
00584 }
00585
00586
00587 void SharedResources::goToFailedState(evf::Exception& exception) {
00588 reasonForFailed_ = exception.what();
00589 LOG4CPLUS_FATAL(log_,
00590 "Moving to FAILED state! Reason: " << exception.what());
00591 EventPtr fail(new Fail());
00592 commands_.enqEvent(fail);
00593 }