CMS 3D CMS Logo

/afs/cern.ch/work/a/aaltunda/public/www/CMSSW_5_3_13_patch3/src/EventFilter/ResourceBroker/src/SharedResources.cc

Go to the documentation of this file.
00001 
00002 //
00003 // SharedResources.h
00004 // -------
00005 //
00006 // Resources shared between FSM states.
00007 //
00008 // Created on: Sep 21, 2011
00009 //                                                                              Andrei Spataru : aspataru@cern.ch
00011 
00012 #include "EventFilter/ResourceBroker/interface/SharedResources.h"
00013 
00014 #include <signal.h>
00015 #include <iostream>
00016 
00017 using std::string;
00018 using std::vector;
00019 using std::cout;
00020 using std::endl;
00021 
00022 using namespace evf::rb_statemachine;
00023 
00024 SharedResources::SharedResources(Logger log) :
00025                         wlMonitoring_(0),
00026                         asMonitoring_(0),
00027                         wlWatching_(0),
00028                         asWatching_(0),
00029                         wlSendData_(0),
00030                         asSendData_(0),
00031                         wlSendDqm_(0),
00032                         asSendDqm_(0),
00033                         wlDiscard_(0),
00034                         asDiscard_(0),
00035                         gui_(0),
00036                         commands_(CommandQueue()),
00037                         log_(log),
00038                         bu_(0),
00039                         sm_(0),
00040                         i2oPool_(0),
00041                         ipcManager_(0),
00042                         resourceStructure_(0),
00043                         runNumber_(0),
00044                         deltaT_(0.0),
00045                         deltaN_(0),
00046                         deltaSumOfSquares_(0),
00047                         deltaSumOfSizes_(0),
00048                         throughput_(0.0),
00049                         rate_(0.0),
00050                         average_(0.0),
00051                         rms_(0.0),
00052                         nbAllocatedEvents_(0),
00053                         nbPendingRequests_(0),
00054                         nbReceivedEvents_(0),
00055                         nbSentEvents_(0),
00056                         nbSentDqmEvents_(0),
00057                         nbSentErrorEvents_(0),
00058                         nbPendingSMDiscards_(0),
00059                         nbPendingSMDqmDiscards_(0),
00060                         nbDiscardedEvents_(0),
00061                         // UPDATED
00062                         nbReceivedEol_(0),
00063                         highestEolReceived_(0),
00064                         nbEolPosted_(0),
00065                         nbEolDiscarded_(0),
00066                         nbLostEvents_(0),
00067                         nbDataErrors_(0),
00068                         nbCrcErrors_(0),
00069                         nbTimeoutsWithEvent_(0),
00070                         nbTimeoutsWithoutEvent_(0),
00071                         dataErrorFlag_(0),
00072                         segmentationMode_(false),
00073                         useMessageQueueIPC_(false),
00074                         nbClients_(0),
00075                         clientPrcIds_(""),
00076                         nbRawCells_(16),
00077                         nbRecoCells_(8),
00078                         nbDqmCells_(8),
00079                         rawCellSize_(0x400000) // 4MB
00080                         ,
00081                         recoCellSize_(0x800000) // 8MB
00082                         ,
00083                         dqmCellSize_(0x800000) // 8MB
00084                         // at least nbRawCells / 2 free resources to send allocate
00085                         , freeResRequiredForAllocate_(-1), doDropEvents_(false),
00086                         doFedIdCheck_(true), doCrcCheck_(1), doDumpEvents_(0),
00087                         buClassName_("BU"), buInstance_(0), smClassName_("StorageManager"),
00088                         smInstance_(0), resourceStructureTimeout_(200000), monSleepSec_(2),
00089                         watchSleepSec_(10), timeOutSec_(30), processKillerEnabled_(true),
00090                         useEvmBoard_(true), reasonForFailed_(""), nbAllocateSent_(0),
00091                         nbTakeReceived_(0), nbDataDiscardReceived_(0),
00092                         nbDqmDiscardReceived_(0), nbSentLast_(0), sumOfSquaresLast_(0),
00093                         sumOfSizesLast_(0), frb_(0), shmInconsistent_(false),
00094                         allowI2ODiscards_(true) {
00095 
00096         sem_init(&lock_, 0, 1);
00097         sem_init(&accessToResourceStructureLock_, 0, 1);
00098 
00099 }
00100 
00101 SharedResources::~SharedResources() {
00102 
00103 }
00104 
00105 //______________________________________________________________________________
00106 void SharedResources::configureResources(xdaq::Application* app) {
00107 
00108         ipcManager_ = new IPCManager(useMessageQueueIPC_);
00109 
00110         ipcManager_->initialise(segmentationMode_.value_, nbRawCells_.value_,
00111                         nbRecoCells_.value_, nbDqmCells_.value_, rawCellSize_.value_,
00112                         recoCellSize_.value_, dqmCellSize_.value_,
00113                         freeResRequiredForAllocate_, bu_, sm_, log_,
00114                         resourceStructureTimeout_.value_, frb_, app);
00115 
00116         resourceStructure_ = ipcManager_->ipc();
00117 
00118         FUResource::doFedIdCheck(doFedIdCheck_);
00119         FUResource::useEvmBoard(useEvmBoard_);
00120         resourceStructure_->setDoCrcCheck(doCrcCheck_);
00121         resourceStructure_->setDoDumpEvents(doDumpEvents_);
00122         reset();
00123         shmInconsistent_ = false;
00124 
00125         // XXX shmInconsistent check
00126         if (resourceStructure_->nbResources() != nbRawCells_.value_
00127                         || resourceStructure_->nbFreeSlots() != nbRawCells_.value_)
00128                 shmInconsistent_ = true;
00129 }
00130 
00131 //______________________________________________________________________________
00132 void SharedResources::reset() {
00133 
00134         gui_->resetCounters();
00135 
00136         deltaT_ = 0.0;
00137         deltaN_ = 0;
00138         deltaSumOfSquares_ = 0.0;
00139         deltaSumOfSizes_ = 0;
00140 
00141         throughput_ = 0.0;
00142         rate_ = 0.0;
00143         average_ = 0.0;
00144         rms_ = 0.0;
00145 
00146         nbSentLast_ = 0;
00147         sumOfSquaresLast_ = 0;
00148         sumOfSizesLast_ = 0;
00149 }
00150 
00151 //______________________________________________________________________________
00152 void SharedResources::cancelAllWorkloops() {
00153         if (wlSendData_) {
00154                 wlSendData_->cancel();
00155                 toolbox::task::getWorkLoopFactory()->removeWorkLoop("SendData",
00156                                 "waiting");
00157         }
00158         if (wlSendDqm_) {
00159                 wlSendDqm_->cancel();
00160                 toolbox::task::getWorkLoopFactory()->removeWorkLoop("SendDqm",
00161                                 "waiting");
00162         }
00163         if (wlDiscard_) {
00164                 wlDiscard_->cancel();
00165                 toolbox::task::getWorkLoopFactory()->removeWorkLoop("Discard",
00166                                 "waiting");
00167         }
00168 
00169         if (wlMonitoring_) {
00170                 wlMonitoring_->cancel();
00171                 toolbox::task::getWorkLoopFactory()->removeWorkLoop("Monitoring",
00172                                 "waiting");
00173         }
00174         if (wlWatching_) {
00175                 wlWatching_->cancel();
00176                 toolbox::task::getWorkLoopFactory()->removeWorkLoop("Watching",
00177                                 "waiting");
00178         }
00179 }
00180 
00181 //______________________________________________________________________________
00182 void SharedResources::startMonitoringWorkLoop() throw (evf::Exception) {
00183 
00184         struct timezone timezone;
00185         gettimeofday(&monStartTime_, &timezone);
00186 
00187         try {
00188                 wlMonitoring_ = toolbox::task::getWorkLoopFactory()->getWorkLoop(
00189                                 sourceId_ + "Monitoring", "waiting");
00190                 if (!wlMonitoring_->isActive())
00191                         wlMonitoring_->activate();
00192                 asMonitoring_ = toolbox::task::bind(this, &SharedResources::monitoring,
00193                                 sourceId_ + "Monitoring");
00194                 wlMonitoring_->submit(asMonitoring_);
00195         } catch (xcept::Exception& e) {
00196                 string msg = "Failed to start workloop 'Monitoring'.";
00197                 XCEPT_RETHROW(evf::Exception, msg, e);
00198         }
00199 }
00200 
00201 //______________________________________________________________________________
00202 bool SharedResources::monitoring(toolbox::task::WorkLoop*) {
00203 
00204         unsigned int nbSent;
00205         uint64_t sumOfSquares;
00206         unsigned int sumOfSizes;
00207         uint64_t deltaSumOfSquares;
00208 
00209         lock();
00210         if (0 == resourceStructure_) {
00211                 deltaT_.value_ = 0.0;
00212                 deltaN_.value_ = 0;
00213                 deltaSumOfSquares_.value_ = 0.0;
00214                 deltaSumOfSizes_.value_ = 0;
00215                 throughput_ = 0.0;
00216                 rate_ = 0.0;
00217                 average_ = 0.0;
00218                 rms_ = 0.0;
00219                 unlock();
00220                 return false;
00221         } else {
00222                 nbSent = resourceStructure_->nbSent();
00223                 sumOfSquares = resourceStructure_->sumOfSquares();
00224                 sumOfSizes = resourceStructure_->sumOfSizes();
00225         }
00226         unlock();
00227 
00228         struct timeval monEndTime;
00229         struct timezone timezone;
00230 
00231         gettimeofday(&monEndTime, &timezone);
00232 
00233         xdata::getInfoSpaceFactory()->lock();
00234         gui_->monInfoSpace()->lock();
00235 
00236         deltaT_.value_ = deltaT(&monStartTime_, &monEndTime);
00237         monStartTime_ = monEndTime;
00238 
00239         deltaN_.value_ = nbSent - nbSentLast_;
00240         nbSentLast_ = nbSent;
00241 
00242         deltaSumOfSquares = sumOfSquares - sumOfSquaresLast_;
00243         deltaSumOfSquares_.value_ = (double) deltaSumOfSquares;
00244         sumOfSquaresLast_ = sumOfSquares;
00245 
00246         deltaSumOfSizes_.value_ = sumOfSizes - sumOfSizesLast_;
00247         sumOfSizesLast_ = sumOfSizes;
00248 
00249         if (deltaT_.value_ != 0) {
00250                 throughput_ = deltaSumOfSizes_.value_ / deltaT_.value_;
00251                 rate_ = deltaN_.value_ / deltaT_.value_;
00252         } else {
00253                 throughput_ = 0.0;
00254                 rate_ = 0.0;
00255         }
00256 
00257         double meanOfSquares, mean, squareOfMean, variance;
00258 
00259         if (deltaN_.value_ != 0) {
00260                 meanOfSquares = deltaSumOfSquares_.value_ / ((double) (deltaN_.value_));
00261                 mean = ((double) (deltaSumOfSizes_.value_))
00262                                 / ((double) (deltaN_.value_));
00263                 squareOfMean = mean * mean;
00264                 variance = meanOfSquares - squareOfMean;
00265                 if (variance < 0.0)
00266                         variance = 0.0;
00267 
00268                 average_ = deltaSumOfSizes_.value_ / deltaN_.value_;
00269                 rms_ = std::sqrt(variance);
00270         } else {
00271                 average_ = 0.0;
00272                 rms_ = 0.0;
00273         }
00274 
00275         gui_->monInfoSpace()->unlock();
00276         xdata::getInfoSpaceFactory()->unlock();
00277 
00278 	::sleep(monSleepSec_.value_);
00279 
00280         return true;
00281 }
00282 
00283 //______________________________________________________________________________
00284 void SharedResources::startWatchingWorkLoop() throw (evf::Exception) {
00285         try {
00286                 wlWatching_ = toolbox::task::getWorkLoopFactory()->getWorkLoop(
00287                                 sourceId_ + "Watching", "waiting");
00288                 if (!wlWatching_->isActive())
00289                         wlWatching_->activate();
00290                 asWatching_ = toolbox::task::bind(this, &SharedResources::watching,
00291                                 sourceId_ + "Watching");
00292                 wlWatching_->submit(asWatching_);
00293         } catch (xcept::Exception& e) {
00294                 string msg = "Failed to start workloop 'Watching'.";
00295                 XCEPT_RETHROW(evf::Exception, msg, e);
00296         }
00297 }
00298 
00299 //______________________________________________________________________________
00300 bool SharedResources::watching(toolbox::task::WorkLoop*) {
00301         lock();
00302         if (0 == resourceStructure_) {
00303                 unlock();
00304                 return false;
00305         }
00306 
00307         vector<pid_t> evt_prcids;
00308         vector<UInt_t> evt_numbers;
00309         vector<time_t> evt_tstamps;
00310         try {
00311                 evt_prcids = resourceStructure_->cellPrcIds();
00312                 evt_numbers = resourceStructure_->cellEvtNumbers();
00313                 evt_tstamps = resourceStructure_->cellTimeStamps();
00314         } catch (evf::Exception& e) {
00315                 goToFailedState(e);
00316         }
00317 
00318         time_t tcurr = time(0);
00319         for (UInt_t i = 0; i < evt_tstamps.size(); i++) {
00320                 pid_t pid = evt_prcids[i];
00321                 UInt_t evt = evt_numbers[i];
00322                 time_t tstamp = evt_tstamps[i];
00323                 if (tstamp == 0)
00324                         continue;
00325                 double tdiff = difftime(tcurr, tstamp);
00326                 if (tdiff > timeOutSec_) {
00327                         if (processKillerEnabled_) {
00328                                 // UPDATED
00329                                 kill(pid, 9);
00330                                 nbTimeoutsWithEvent_++;
00331                         }
00332                         LOG4CPLUS_ERROR(
00333                                         log_,
00334                                         "evt " << evt << " under processing for more than "
00335                                                         << timeOutSec_ << "sec for process " << pid);
00336 
00337                 }
00338         }
00339 
00340         vector<pid_t> prcids;
00341         try {  
00342                 #ifdef linux 
00343                 auto lk = resourceStructure_->lockCrashHandler();
00344                 #endif
00345                 prcids = resourceStructure_->clientPrcIds();
00346                 for (UInt_t i = 0; i < prcids.size(); i++) {
00347                         pid_t pid = prcids[i];
00348                         int status = kill(pid, 0);
00349                         if (status != 0) {
00350                                 LOG4CPLUS_ERROR(
00351                                                 log_,
00352                                                 "EP prc " << pid
00353                                                                 << " died, send to error stream if processing.");
00354                                 if (!resourceStructure_->handleCrashedEP(runNumber_, pid))
00355                                         nbTimeoutsWithoutEvent_++;
00356                         }
00357                 }
00358 
00359         } catch (evf::Exception& e) {
00360                 goToFailedState(e);
00361         }
00362 
00363         try {
00364                 if ((resourceStructure_->nbResources() != nbRawCells_.value_)
00365                                 && !shmInconsistent_) {
00366                         std::ostringstream ost;
00367                         ost << "Watchdog spotted inconsistency in ResourceTable - nbRaw="
00368                                         << nbRawCells_.value_ << " but nbResources="
00369                                         << resourceStructure_->nbResources() << " and nbFreeSlots="
00370                                         << resourceStructure_->nbFreeSlots();
00371                         XCEPT_DECLARE(evf::Exception, sentinelException, ost.str());
00372                         fsm_->getApp()->notifyQualified("error", sentinelException);
00373 
00374                         // XXX shmInconsistent
00375                         shmInconsistent_ = true;
00376                 }
00377         } catch (evf::Exception& e) {
00378                 goToFailedState(e);
00379         }
00380 
00381         unlock();
00382 
00383 	::sleep(watchSleepSec_.value_);
00384         return true;
00385 }
00386 
00387 //______________________________________________________________________________
00388 double SharedResources::deltaT(const struct timeval *start,
00389                 const struct timeval *end) {
00390         unsigned int sec;
00391         unsigned int usec;
00392 
00393         sec = end->tv_sec - start->tv_sec;
00394 
00395         if (end->tv_usec > start->tv_usec) {
00396                 usec = end->tv_usec - start->tv_usec;
00397         } else {
00398                 sec--;
00399                 usec = 1000000 - ((unsigned int) (start->tv_usec - end->tv_usec));
00400         }
00401 
00402         return ((double) sec) + ((double) usec) / 1000000.0;
00403 }
00404 
00405 // sendData workloop STARTER
00406 //______________________________________________________________________________
00407 void SharedResources::startSendDataWorkLoop() throw (evf::Exception) {
00408         try {
00409                 LOG4CPLUS_INFO(log_, "Start 'send data' workloop.");
00410                 wlSendData_ = toolbox::task::getWorkLoopFactory()->getWorkLoop(
00411                                 "SendData", "waiting");
00412                 if (!wlSendData_->isActive())
00413                         wlSendData_->activate();
00414                 asSendData_ = toolbox::task::bind(this, &SharedResources::sendData,
00415                                 "SendData");
00416                 wlSendData_->submit(asSendData_);
00417 
00418         } catch (xcept::Exception& e) {
00419                 string msg = "Failed to start workloop 'SendData'.";
00420                 XCEPT_RETHROW(evf::Exception, msg, e);
00421         }
00422 }
00423 
00424 //  sendData workloop DISPATCHING SIGNATURE
00425 bool SharedResources::sendData(toolbox::task::WorkLoop*) {
00426         int currentStateID = -1;
00427         bool reschedule = true;
00428 
00429         fsm_->transitionReadLock();
00430         currentStateID = fsm_->getCurrentState().stateID();
00431         fsm_->transitionUnlock();
00432 
00433         try {
00434                 switch (currentStateID) {
00435                 case rb_statemachine::RUNNING:
00436                         reschedule = resourceStructure_->sendData();
00437                         break;
00438                 case rb_statemachine::STOPPING:
00439                         reschedule = resourceStructure_->sendData();
00440                         break;
00441                 case rb_statemachine::HALTING:
00442                         reschedule = resourceStructure_->sendDataWhileHalting();
00443                         break;
00444                 case rb_statemachine::FAILED:
00445                         // workloop must be exited in this state
00446                         return false;
00447                 default:
00448                         cout << "RBStateMachine: current state: " << currentStateID
00449                                         << " does not support action: >>sendData<<" << endl;
00450                         ::usleep(50000);
00451                         reschedule = true;
00452                 }
00453         } catch (evf::Exception& e) {
00454                 goToFailedState(e);
00455         }
00456 
00457         return reschedule;
00458 }
00459 
00460 // sendDqm workloop STARTER
00461 //______________________________________________________________________________
00462 void SharedResources::startSendDqmWorkLoop() throw (evf::Exception) {
00463         try {
00464                 LOG4CPLUS_INFO(log_, "Start 'send dqm' workloop.");
00465                 wlSendDqm_ = toolbox::task::getWorkLoopFactory()->getWorkLoop(
00466                                 "SendDqm", "waiting");
00467                 if (!wlSendDqm_->isActive())
00468                         wlSendDqm_->activate();
00469                 asSendDqm_ = toolbox::task::bind(this, &SharedResources::sendDqm,
00470                                 "SendDqm");
00471                 wlSendDqm_->submit(asSendDqm_);
00472 
00473         } catch (xcept::Exception& e) {
00474                 string msg = "Failed to start workloop 'SendDqm'.";
00475                 XCEPT_RETHROW(evf::Exception, msg, e);
00476         }
00477 }
00478 
00479 //  sendDqm workloop DISPATCHING SIGNATURE
00480 bool SharedResources::sendDqm(toolbox::task::WorkLoop*) {
00481         int currentStateID = -1;
00482         bool reschedule = true;
00483 
00484         fsm_->transitionReadLock();
00485         currentStateID = fsm_->getCurrentState().stateID();
00486         fsm_->transitionUnlock();
00487 
00488         try {
00489                 switch (currentStateID) {
00490                 case rb_statemachine::RUNNING:
00491                         reschedule = resourceStructure_->sendDqm();
00492                         break;
00493                 case rb_statemachine::STOPPING:
00494                         reschedule = resourceStructure_->sendDqm();
00495                         break;
00496                 case rb_statemachine::HALTING:
00497                         reschedule = resourceStructure_->sendDqmWhileHalting();
00498                         break;
00499                 case rb_statemachine::FAILED:
00500                         // workloop must be exited in this state
00501                         return false;
00502                 default:
00503                         cout << "RBStateMachine: current state: " << currentStateID
00504                                         << " does not support action: >>sendDqm<<" << endl;
00505                         ::usleep(50000);
00506                         reschedule = true;
00507                 }
00508         } catch (evf::Exception& e) {
00509                 goToFailedState(e);
00510         }
00511 
00512         return reschedule;
00513 }
00514 
00515 // discard workloop STARTER
00516 //______________________________________________________________________________
00517 void SharedResources::startDiscardWorkLoop() throw (evf::Exception) {
00518         try {
00519                 LOG4CPLUS_INFO(log_, "Start 'discard' workloop.");
00520                 wlDiscard_ = toolbox::task::getWorkLoopFactory()->getWorkLoop(
00521                                 "Discard", "waiting");
00522                 if (!wlDiscard_->isActive())
00523                         wlDiscard_->activate();
00524                 asDiscard_ = toolbox::task::bind(this, &SharedResources::discard,
00525                                 "Discard");
00526                 wlDiscard_->submit(asDiscard_);
00527                 resourceStructure_->setActive(true);
00528 
00529         } catch (xcept::Exception& e) {
00530                 string msg = "Failed to start workloop 'Discard'.";
00531                 XCEPT_RETHROW(evf::Exception, msg, e);
00532         }
00533         resourceStructure_->setReadyToShutDown(false);
00534 }
00535 
00536 //  discard workloop DISPATCHING SIGNATURE
00537 bool SharedResources::discard(toolbox::task::WorkLoop*) {
00538         int currentStateID = -1;
00539         bool reschedule = true;
00540 
00541         fsm_->transitionReadLock();
00542         currentStateID = fsm_->getCurrentState().stateID();
00543         fsm_->transitionUnlock();
00544         try {
00545                 switch (currentStateID) {
00546                 case rb_statemachine::RUNNING:
00547                         reschedule = resourceStructure_->discard();
00548                         break;
00549                 case rb_statemachine::STOPPING:
00550                         // XXX: communication with BU after stop!
00551                         reschedule = resourceStructure_->discardWhileHalting(true);
00552                         break;
00553                 case rb_statemachine::HALTING:
00554                         // XXX: no more communication with BU after halt!
00555                         reschedule = resourceStructure_->discardWhileHalting(false);
00556                         break;
00557                 case rb_statemachine::FAILED:
00558                         // workloop must be exited in this state
00559                         return false;
00560                 default:
00561                         cout << "RBStateMachine: current state: " << currentStateID
00562                                         << " does not support action: >>discard<<" << endl;
00563                         ::usleep(50000);
00564                         reschedule = true;
00565                 }
00566         } catch (evf::Exception& e) {
00567                 goToFailedState(e);
00568         }
00569 
00570         return reschedule;
00571 }
00572 
00573 //______________________________________________________________________________
00574 void SharedResources::printWorkLoopStatus() {
00575         cout << "Workloop status===============" << endl;
00576         cout << "==============================" << endl;
00577         if (wlSendData_ != 0)
00578                 cout << "SendData -> " << wlSendData_->isActive() << endl;
00579         if (wlSendDqm_ != 0)
00580                 cout << "SendDqm  -> " << wlSendDqm_->isActive() << endl;
00581         if (wlDiscard_ != 0)
00582                 cout << "Discard  -> " << wlDiscard_->isActive() << endl;
00583         //cout << "Workloops Active  -> " << isActive_ << endl;
00584 }
00585 
00586 //______________________________________________________________________________
00587 void SharedResources::goToFailedState(evf::Exception& exception) {
00588         reasonForFailed_ = exception.what();
00589         LOG4CPLUS_FATAL(log_,
00590                         "Moving to FAILED state! Reason: " << exception.what());
00591         EventPtr fail(new Fail());
00592         commands_.enqEvent(fail);
00593 }