00001
00002
00003
00004
00005
00006
00007
00008
00009
00011
00012 #include "EventFilter/ResourceBroker/interface/SharedResources.h"
00013
00014 #include <signal.h>
00015 #include <iostream>
00016
00017 using std::string;
00018 using std::vector;
00019 using std::cout;
00020 using std::endl;
00021
00022 using namespace evf::rb_statemachine;
00023
00024 SharedResources::SharedResources(Logger log) :
00025 wlMonitoring_(0),
00026 asMonitoring_(0),
00027 wlWatching_(0),
00028 asWatching_(0),
00029 wlSendData_(0),
00030 asSendData_(0),
00031 wlSendDqm_(0),
00032 asSendDqm_(0),
00033 wlDiscard_(0),
00034 asDiscard_(0),
00035 gui_(0),
00036 commands_(CommandQueue()),
00037 log_(log),
00038 bu_(0),
00039 sm_(0),
00040 i2oPool_(0),
00041 ipcManager_(0),
00042 resourceStructure_(0),
00043 runNumber_(0),
00044 deltaT_(0.0),
00045 deltaN_(0),
00046 deltaSumOfSquares_(0),
00047 deltaSumOfSizes_(0),
00048 throughput_(0.0),
00049 rate_(0.0),
00050 average_(0.0),
00051 rms_(0.0),
00052 nbAllocatedEvents_(0),
00053 nbPendingRequests_(0),
00054 nbReceivedEvents_(0),
00055 nbSentEvents_(0),
00056 nbSentDqmEvents_(0),
00057 nbSentErrorEvents_(0),
00058 nbPendingSMDiscards_(0),
00059 nbPendingSMDqmDiscards_(0),
00060 nbDiscardedEvents_(0),
00061
00062 nbReceivedEol_(0),
00063 highestEolReceived_(0),
00064 nbEolPosted_(0),
00065 nbEolDiscarded_(0),
00066 nbLostEvents_(0),
00067 nbDataErrors_(0),
00068 nbCrcErrors_(0),
00069 nbTimeoutsWithEvent_(0),
00070 nbTimeoutsWithoutEvent_(0),
00071 dataErrorFlag_(0),
00072 segmentationMode_(false),
00073 useMessageQueueIPC_(false),
00074 nbClients_(0),
00075 clientPrcIds_(""),
00076 nbRawCells_(16),
00077 nbRecoCells_(8),
00078 nbDqmCells_(8),
00079 rawCellSize_(0x400000)
00080 ,
00081 recoCellSize_(0x800000)
00082 ,
00083 dqmCellSize_(0x800000)
00084
00085 , freeResRequiredForAllocate_(-1), doDropEvents_(false),
00086 doFedIdCheck_(true), doCrcCheck_(1), doDumpEvents_(0),
00087 buClassName_("BU"), buInstance_(0), smClassName_("StorageManager"),
00088 smInstance_(0), resourceStructureTimeout_(200000), monSleepSec_(2),
00089 watchSleepSec_(10), timeOutSec_(30), processKillerEnabled_(true),
00090 useEvmBoard_(true), reasonForFailed_(""), nbAllocateSent_(0),
00091 nbTakeReceived_(0), nbDataDiscardReceived_(0),
00092 nbDqmDiscardReceived_(0), nbSentLast_(0), sumOfSquaresLast_(0),
00093 sumOfSizesLast_(0), frb_(0), shmInconsistent_(false),
00094 allowAccessToResourceStructure_(true) {
00095
00096 sem_init(&lock_, 0, 1);
00097 sem_init(&accessToResourceStructureLock_, 0, 1);
00098
00099 }
00100
00101 SharedResources::~SharedResources() {
00102
00103 }
00104
00105
00106 void SharedResources::configureResources(xdaq::Application* app) {
00107
00108 ipcManager_ = new IPCManager(useMessageQueueIPC_);
00109
00110 ipcManager_->initialise(segmentationMode_.value_, nbRawCells_.value_,
00111 nbRecoCells_.value_, nbDqmCells_.value_, rawCellSize_.value_,
00112 recoCellSize_.value_, dqmCellSize_.value_,
00113 freeResRequiredForAllocate_, bu_, sm_, log_,
00114 resourceStructureTimeout_.value_, frb_, app);
00115
00116 resourceStructure_ = ipcManager_->ipc();
00117
00118 FUResource::doFedIdCheck(doFedIdCheck_);
00119 FUResource::useEvmBoard(useEvmBoard_);
00120 resourceStructure_->setDoCrcCheck(doCrcCheck_);
00121 resourceStructure_->setDoDumpEvents(doDumpEvents_);
00122 reset();
00123 shmInconsistent_ = false;
00124
00125
00126 if (resourceStructure_->nbResources() != nbRawCells_.value_
00127 || resourceStructure_->nbFreeSlots() != nbRawCells_.value_)
00128 shmInconsistent_ = true;
00129 }
00130
00131
00132 void SharedResources::reset() {
00133
00134 gui_->resetCounters();
00135
00136 deltaT_ = 0.0;
00137 deltaN_ = 0;
00138 deltaSumOfSquares_ = 0.0;
00139 deltaSumOfSizes_ = 0;
00140
00141 throughput_ = 0.0;
00142 rate_ = 0.0;
00143 average_ = 0.0;
00144 rms_ = 0.0;
00145
00146 nbSentLast_ = 0;
00147 sumOfSquaresLast_ = 0;
00148 sumOfSizesLast_ = 0;
00149 }
00150
00151
00152 void SharedResources::cancelAllWorkloops() {
00153 if (wlSendData_) {
00154 wlSendData_->cancel();
00155 toolbox::task::getWorkLoopFactory()->removeWorkLoop("SendData",
00156 "waiting");
00157 }
00158 if (wlSendDqm_) {
00159 wlSendDqm_->cancel();
00160 toolbox::task::getWorkLoopFactory()->removeWorkLoop("SendDqm",
00161 "waiting");
00162 }
00163 if (wlDiscard_) {
00164 wlDiscard_->cancel();
00165 toolbox::task::getWorkLoopFactory()->removeWorkLoop("Discard",
00166 "waiting");
00167 }
00168
00169 if (wlMonitoring_) {
00170 wlMonitoring_->cancel();
00171 toolbox::task::getWorkLoopFactory()->removeWorkLoop("Monitoring",
00172 "waiting");
00173 }
00174 if (wlWatching_) {
00175 wlWatching_->cancel();
00176 toolbox::task::getWorkLoopFactory()->removeWorkLoop("Watching",
00177 "waiting");
00178 }
00179 }
00180
00181
00182 void SharedResources::startMonitoringWorkLoop() throw (evf::Exception) {
00183
00184 struct timezone timezone;
00185 gettimeofday(&monStartTime_, &timezone);
00186
00187 try {
00188 wlMonitoring_ = toolbox::task::getWorkLoopFactory()->getWorkLoop(
00189 sourceId_ + "Monitoring", "waiting");
00190 if (!wlMonitoring_->isActive())
00191 wlMonitoring_->activate();
00192 asMonitoring_ = toolbox::task::bind(this, &SharedResources::monitoring,
00193 sourceId_ + "Monitoring");
00194 wlMonitoring_->submit(asMonitoring_);
00195 } catch (xcept::Exception& e) {
00196 string msg = "Failed to start workloop 'Monitoring'.";
00197 XCEPT_RETHROW(evf::Exception, msg, e);
00198 }
00199 }
00200
00201
00202 bool SharedResources::monitoring(toolbox::task::WorkLoop*) {
00203
00204 unsigned int nbSent;
00205 uint64_t sumOfSquares;
00206 unsigned int sumOfSizes;
00207 uint64_t deltaSumOfSquares;
00208
00209 lock();
00210 if (0 == resourceStructure_) {
00211 deltaT_.value_ = 0.0;
00212 deltaN_.value_ = 0;
00213 deltaSumOfSquares_.value_ = 0.0;
00214 deltaSumOfSizes_.value_ = 0;
00215 throughput_ = 0.0;
00216 rate_ = 0.0;
00217 average_ = 0.0;
00218 rms_ = 0.0;
00219 unlock();
00220 return false;
00221 } else {
00222 nbSent = resourceStructure_->nbSent();
00223 sumOfSquares = resourceStructure_->sumOfSquares();
00224 sumOfSizes = resourceStructure_->sumOfSizes();
00225 }
00226 unlock();
00227
00228 struct timeval monEndTime;
00229 struct timezone timezone;
00230
00231 gettimeofday(&monEndTime, &timezone);
00232
00233 xdata::getInfoSpaceFactory()->lock();
00234 gui_->monInfoSpace()->lock();
00235
00236 deltaT_.value_ = deltaT(&monStartTime_, &monEndTime);
00237 monStartTime_ = monEndTime;
00238
00239 deltaN_.value_ = nbSent - nbSentLast_;
00240 nbSentLast_ = nbSent;
00241
00242 deltaSumOfSquares = sumOfSquares - sumOfSquaresLast_;
00243 deltaSumOfSquares_.value_ = (double) deltaSumOfSquares;
00244 sumOfSquaresLast_ = sumOfSquares;
00245
00246 deltaSumOfSizes_.value_ = sumOfSizes - sumOfSizesLast_;
00247 sumOfSizesLast_ = sumOfSizes;
00248
00249 if (deltaT_.value_ != 0) {
00250 throughput_ = deltaSumOfSizes_.value_ / deltaT_.value_;
00251 rate_ = deltaN_.value_ / deltaT_.value_;
00252 } else {
00253 throughput_ = 0.0;
00254 rate_ = 0.0;
00255 }
00256
00257 double meanOfSquares, mean, squareOfMean, variance;
00258
00259 if (deltaN_.value_ != 0) {
00260 meanOfSquares = deltaSumOfSquares_.value_ / ((double) (deltaN_.value_));
00261 mean = ((double) (deltaSumOfSizes_.value_))
00262 / ((double) (deltaN_.value_));
00263 squareOfMean = mean * mean;
00264 variance = meanOfSquares - squareOfMean;
00265 if (variance < 0.0)
00266 variance = 0.0;
00267
00268 average_ = deltaSumOfSizes_.value_ / deltaN_.value_;
00269 rms_ = std::sqrt(variance);
00270 } else {
00271 average_ = 0.0;
00272 rms_ = 0.0;
00273 }
00274
00275 gui_->monInfoSpace()->unlock();
00276 xdata::getInfoSpaceFactory()->unlock();
00277
00278 ::sleep(monSleepSec_.value_);
00279
00280 return true;
00281 }
00282
00283
00284 void SharedResources::startWatchingWorkLoop() throw (evf::Exception) {
00285 try {
00286 wlWatching_ = toolbox::task::getWorkLoopFactory()->getWorkLoop(
00287 sourceId_ + "Watching", "waiting");
00288 if (!wlWatching_->isActive())
00289 wlWatching_->activate();
00290 asWatching_ = toolbox::task::bind(this, &SharedResources::watching,
00291 sourceId_ + "Watching");
00292 wlWatching_->submit(asWatching_);
00293 } catch (xcept::Exception& e) {
00294 string msg = "Failed to start workloop 'Watching'.";
00295 XCEPT_RETHROW(evf::Exception, msg, e);
00296 }
00297 }
00298
00299
00300 bool SharedResources::watching(toolbox::task::WorkLoop*) {
00301 lock();
00302 if (0 == resourceStructure_) {
00303 unlock();
00304 return false;
00305 }
00306
00307 vector<pid_t> evt_prcids;
00308 vector<UInt_t> evt_numbers;
00309 vector<time_t> evt_tstamps;
00310 try {
00311 evt_prcids = resourceStructure_->cellPrcIds();
00312 evt_numbers = resourceStructure_->cellEvtNumbers();
00313 evt_tstamps = resourceStructure_->cellTimeStamps();
00314 } catch (evf::Exception& e) {
00315 goToFailedState(e);
00316 }
00317
00318 time_t tcurr = time(0);
00319 for (UInt_t i = 0; i < evt_tstamps.size(); i++) {
00320 pid_t pid = evt_prcids[i];
00321 UInt_t evt = evt_numbers[i];
00322 time_t tstamp = evt_tstamps[i];
00323 if (tstamp == 0)
00324 continue;
00325 double tdiff = difftime(tcurr, tstamp);
00326 if (tdiff > timeOutSec_) {
00327 if (processKillerEnabled_) {
00328
00329 kill(pid, 9);
00330 nbTimeoutsWithEvent_++;
00331 }
00332 LOG4CPLUS_ERROR(
00333 log_,
00334 "evt " << evt << " under processing for more than "
00335 << timeOutSec_ << "sec for process " << pid);
00336
00337 }
00338 }
00339
00340 vector<pid_t> prcids;
00341 try {
00342 prcids = resourceStructure_->clientPrcIds();
00343 for (UInt_t i = 0; i < prcids.size(); i++) {
00344 pid_t pid = prcids[i];
00345 int status = kill(pid, 0);
00346 if (status != 0) {
00347 LOG4CPLUS_ERROR(
00348 log_,
00349 "EP prc " << pid
00350 << " died, send to error stream if processing.");
00351 if (!resourceStructure_->handleCrashedEP(runNumber_, pid))
00352 nbTimeoutsWithoutEvent_++;
00353 }
00354 }
00355 } catch (evf::Exception& e) {
00356 goToFailedState(e);
00357 }
00358
00359 try {
00360 if ((resourceStructure_->nbResources() != nbRawCells_.value_)
00361 && !shmInconsistent_) {
00362 std::ostringstream ost;
00363 ost << "Watchdog spotted inconsistency in ResourceTable - nbRaw="
00364 << nbRawCells_.value_ << " but nbResources="
00365 << resourceStructure_->nbResources() << " and nbFreeSlots="
00366 << resourceStructure_->nbFreeSlots();
00367 XCEPT_DECLARE(evf::Exception, sentinelException, ost.str());
00368 fsm_->getApp()->notifyQualified("error", sentinelException);
00369
00370
00371 shmInconsistent_ = true;
00372 }
00373 } catch (evf::Exception& e) {
00374 goToFailedState(e);
00375 }
00376
00377 unlock();
00378
00379 ::sleep(watchSleepSec_.value_);
00380 return true;
00381 }
00382
00383
00384 double SharedResources::deltaT(const struct timeval *start,
00385 const struct timeval *end) {
00386 unsigned int sec;
00387 unsigned int usec;
00388
00389 sec = end->tv_sec - start->tv_sec;
00390
00391 if (end->tv_usec > start->tv_usec) {
00392 usec = end->tv_usec - start->tv_usec;
00393 } else {
00394 sec--;
00395 usec = 1000000 - ((unsigned int) (start->tv_usec - end->tv_usec));
00396 }
00397
00398 return ((double) sec) + ((double) usec) / 1000000.0;
00399 }
00400
00401
00402
00403 void SharedResources::startSendDataWorkLoop() throw (evf::Exception) {
00404 try {
00405 LOG4CPLUS_INFO(log_, "Start 'send data' workloop.");
00406 wlSendData_ = toolbox::task::getWorkLoopFactory()->getWorkLoop(
00407 "SendData", "waiting");
00408 if (!wlSendData_->isActive())
00409 wlSendData_->activate();
00410 asSendData_ = toolbox::task::bind(this, &SharedResources::sendData,
00411 "SendData");
00412 wlSendData_->submit(asSendData_);
00413
00414 } catch (xcept::Exception& e) {
00415 string msg = "Failed to start workloop 'SendData'.";
00416 XCEPT_RETHROW(evf::Exception, msg, e);
00417 }
00418 }
00419
00420
00421 bool SharedResources::sendData(toolbox::task::WorkLoop*) {
00422 int currentStateID = -1;
00423 bool reschedule = true;
00424
00425 fsm_->transitionReadLock();
00426 currentStateID = fsm_->getCurrentState().stateID();
00427 fsm_->transitionUnlock();
00428
00429 try {
00430 switch (currentStateID) {
00431 case rb_statemachine::RUNNING:
00432 reschedule = resourceStructure_->sendData();
00433 break;
00434 case rb_statemachine::STOPPING:
00435 reschedule = resourceStructure_->sendData();
00436 break;
00437 case rb_statemachine::HALTING:
00438 reschedule = resourceStructure_->sendDataWhileHalting();
00439 break;
00440 case rb_statemachine::FAILED:
00441
00442 return false;
00443 default:
00444 cout << "RBStateMachine: current state: " << currentStateID
00445 << " does not support action: >>sendData<<" << endl;
00446 ::usleep(50000);
00447 reschedule = true;
00448 }
00449 } catch (evf::Exception& e) {
00450 goToFailedState(e);
00451 }
00452
00453 return reschedule;
00454 }
00455
00456
00457
00458 void SharedResources::startSendDqmWorkLoop() throw (evf::Exception) {
00459 try {
00460 LOG4CPLUS_INFO(log_, "Start 'send dqm' workloop.");
00461 wlSendDqm_ = toolbox::task::getWorkLoopFactory()->getWorkLoop(
00462 "SendDqm", "waiting");
00463 if (!wlSendDqm_->isActive())
00464 wlSendDqm_->activate();
00465 asSendDqm_ = toolbox::task::bind(this, &SharedResources::sendDqm,
00466 "SendDqm");
00467 wlSendDqm_->submit(asSendDqm_);
00468
00469 } catch (xcept::Exception& e) {
00470 string msg = "Failed to start workloop 'SendDqm'.";
00471 XCEPT_RETHROW(evf::Exception, msg, e);
00472 }
00473 }
00474
00475
00476 bool SharedResources::sendDqm(toolbox::task::WorkLoop*) {
00477 int currentStateID = -1;
00478 bool reschedule = true;
00479
00480 fsm_->transitionReadLock();
00481 currentStateID = fsm_->getCurrentState().stateID();
00482 fsm_->transitionUnlock();
00483
00484 try {
00485 switch (currentStateID) {
00486 case rb_statemachine::RUNNING:
00487 reschedule = resourceStructure_->sendDqm();
00488 break;
00489 case rb_statemachine::STOPPING:
00490 reschedule = resourceStructure_->sendDqm();
00491 break;
00492 case rb_statemachine::HALTING:
00493 reschedule = resourceStructure_->sendDqmWhileHalting();
00494 break;
00495 case rb_statemachine::FAILED:
00496
00497 return false;
00498 default:
00499 cout << "RBStateMachine: current state: " << currentStateID
00500 << " does not support action: >>sendDqm<<" << endl;
00501 ::usleep(50000);
00502 reschedule = true;
00503 }
00504 } catch (evf::Exception& e) {
00505 goToFailedState(e);
00506 }
00507
00508 return reschedule;
00509 }
00510
00511
00512
00513 void SharedResources::startDiscardWorkLoop() throw (evf::Exception) {
00514 try {
00515 LOG4CPLUS_INFO(log_, "Start 'discard' workloop.");
00516 wlDiscard_ = toolbox::task::getWorkLoopFactory()->getWorkLoop(
00517 "Discard", "waiting");
00518 if (!wlDiscard_->isActive())
00519 wlDiscard_->activate();
00520 asDiscard_ = toolbox::task::bind(this, &SharedResources::discard,
00521 "Discard");
00522 wlDiscard_->submit(asDiscard_);
00523 resourceStructure_->setActive(true);
00524
00525 } catch (xcept::Exception& e) {
00526 string msg = "Failed to start workloop 'Discard'.";
00527 XCEPT_RETHROW(evf::Exception, msg, e);
00528 }
00529 resourceStructure_->setReadyToShutDown(false);
00530 }
00531
00532
00533 bool SharedResources::discard(toolbox::task::WorkLoop*) {
00534 int currentStateID = -1;
00535 bool reschedule = true;
00536
00537 fsm_->transitionReadLock();
00538 currentStateID = fsm_->getCurrentState().stateID();
00539 fsm_->transitionUnlock();
00540 try {
00541 switch (currentStateID) {
00542 case rb_statemachine::RUNNING:
00543 reschedule = resourceStructure_->discard();
00544 break;
00545 case rb_statemachine::STOPPING:
00546
00547 reschedule = resourceStructure_->discardWhileHalting(true);
00548 break;
00549 case rb_statemachine::HALTING:
00550
00551 reschedule = resourceStructure_->discardWhileHalting(false);
00552 break;
00553 case rb_statemachine::FAILED:
00554
00555 return false;
00556 default:
00557 cout << "RBStateMachine: current state: " << currentStateID
00558 << " does not support action: >>discard<<" << endl;
00559 ::usleep(50000);
00560 reschedule = true;
00561 }
00562 } catch (evf::Exception& e) {
00563 goToFailedState(e);
00564 }
00565
00566 return reschedule;
00567 }
00568
00569
00570 void SharedResources::printWorkLoopStatus() {
00571 cout << "Workloop status===============" << endl;
00572 cout << "==============================" << endl;
00573 if (wlSendData_ != 0)
00574 cout << "SendData -> " << wlSendData_->isActive() << endl;
00575 if (wlSendDqm_ != 0)
00576 cout << "SendDqm -> " << wlSendDqm_->isActive() << endl;
00577 if (wlDiscard_ != 0)
00578 cout << "Discard -> " << wlDiscard_->isActive() << endl;
00579
00580 }
00581
00582
00583 void SharedResources::goToFailedState(evf::Exception& exception) {
00584 reasonForFailed_ = exception.what();
00585 LOG4CPLUS_FATAL(log_,
00586 "Moving to FAILED state! Reason: " << exception.what());
00587 EventPtr fail(new Fail());
00588 commands_.enqEvent(fail);
00589 }