CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_5_3_1/src/EventFilter/ResourceBroker/src/SharedResources.cc

Go to the documentation of this file.
00001 
00002 //
00003 // SharedResources.h
00004 // -------
00005 //
00006 // Resources shared between FSM states.
00007 //
00008 // Created on: Sep 21, 2011
00009 //                                                                              Andrei Spataru : aspataru@cern.ch
00011 
00012 #include "EventFilter/ResourceBroker/interface/SharedResources.h"
00013 
00014 #include <signal.h>
00015 #include <iostream>
00016 
00017 using std::string;
00018 using std::vector;
00019 using std::cout;
00020 using std::endl;
00021 
00022 using namespace evf::rb_statemachine;
00023 
00024 SharedResources::SharedResources(Logger log) :
00025                         wlMonitoring_(0),
00026                         asMonitoring_(0),
00027                         wlWatching_(0),
00028                         asWatching_(0),
00029                         wlSendData_(0),
00030                         asSendData_(0),
00031                         wlSendDqm_(0),
00032                         asSendDqm_(0),
00033                         wlDiscard_(0),
00034                         asDiscard_(0),
00035                         gui_(0),
00036                         commands_(CommandQueue()),
00037                         log_(log),
00038                         bu_(0),
00039                         sm_(0),
00040                         i2oPool_(0),
00041                         ipcManager_(0),
00042                         resourceStructure_(0),
00043                         runNumber_(0),
00044                         deltaT_(0.0),
00045                         deltaN_(0),
00046                         deltaSumOfSquares_(0),
00047                         deltaSumOfSizes_(0),
00048                         throughput_(0.0),
00049                         rate_(0.0),
00050                         average_(0.0),
00051                         rms_(0.0),
00052                         nbAllocatedEvents_(0),
00053                         nbPendingRequests_(0),
00054                         nbReceivedEvents_(0),
00055                         nbSentEvents_(0),
00056                         nbSentDqmEvents_(0),
00057                         nbSentErrorEvents_(0),
00058                         nbPendingSMDiscards_(0),
00059                         nbPendingSMDqmDiscards_(0),
00060                         nbDiscardedEvents_(0),
00061                         // UPDATED
00062                         nbReceivedEol_(0),
00063                         highestEolReceived_(0),
00064                         nbEolPosted_(0),
00065                         nbEolDiscarded_(0),
00066                         nbLostEvents_(0),
00067                         nbDataErrors_(0),
00068                         nbCrcErrors_(0),
00069                         nbTimeoutsWithEvent_(0),
00070                         nbTimeoutsWithoutEvent_(0),
00071                         dataErrorFlag_(0),
00072                         segmentationMode_(false),
00073                         useMessageQueueIPC_(false),
00074                         nbClients_(0),
00075                         clientPrcIds_(""),
00076                         nbRawCells_(16),
00077                         nbRecoCells_(8),
00078                         nbDqmCells_(8),
00079                         rawCellSize_(0x400000) // 4MB
00080                         ,
00081                         recoCellSize_(0x800000) // 8MB
00082                         ,
00083                         dqmCellSize_(0x800000) // 8MB
00084                         // at least nbRawCells / 2 free resources to send allocate
00085                         , freeResRequiredForAllocate_(-1), doDropEvents_(false),
00086                         doFedIdCheck_(true), doCrcCheck_(1), doDumpEvents_(0),
00087                         buClassName_("BU"), buInstance_(0), smClassName_("StorageManager"),
00088                         smInstance_(0), resourceStructureTimeout_(200000), monSleepSec_(2),
00089                         watchSleepSec_(10), timeOutSec_(30), processKillerEnabled_(true),
00090                         useEvmBoard_(true), reasonForFailed_(""), nbAllocateSent_(0),
00091                         nbTakeReceived_(0), nbDataDiscardReceived_(0),
00092                         nbDqmDiscardReceived_(0), nbSentLast_(0), sumOfSquaresLast_(0),
00093                         sumOfSizesLast_(0), frb_(0), shmInconsistent_(false),
00094                         allowAccessToResourceStructure_(true) {
00095 
00096         sem_init(&lock_, 0, 1);
00097         sem_init(&accessToResourceStructureLock_, 0, 1);
00098 
00099 }
00100 
00101 SharedResources::~SharedResources() {
00102 
00103 }
00104 
00105 //______________________________________________________________________________
00106 void SharedResources::configureResources(xdaq::Application* app) {
00107 
00108         ipcManager_ = new IPCManager(useMessageQueueIPC_);
00109 
00110         ipcManager_->initialise(segmentationMode_.value_, nbRawCells_.value_,
00111                         nbRecoCells_.value_, nbDqmCells_.value_, rawCellSize_.value_,
00112                         recoCellSize_.value_, dqmCellSize_.value_,
00113                         freeResRequiredForAllocate_, bu_, sm_, log_,
00114                         resourceStructureTimeout_.value_, frb_, app);
00115 
00116         resourceStructure_ = ipcManager_->ipc();
00117 
00118         FUResource::doFedIdCheck(doFedIdCheck_);
00119         FUResource::useEvmBoard(useEvmBoard_);
00120         resourceStructure_->setDoCrcCheck(doCrcCheck_);
00121         resourceStructure_->setDoDumpEvents(doDumpEvents_);
00122         reset();
00123         shmInconsistent_ = false;
00124 
00125         // XXX shmInconsistent check
00126         if (resourceStructure_->nbResources() != nbRawCells_.value_
00127                         || resourceStructure_->nbFreeSlots() != nbRawCells_.value_)
00128                 shmInconsistent_ = true;
00129 }
00130 
00131 //______________________________________________________________________________
00132 void SharedResources::reset() {
00133 
00134         gui_->resetCounters();
00135 
00136         deltaT_ = 0.0;
00137         deltaN_ = 0;
00138         deltaSumOfSquares_ = 0.0;
00139         deltaSumOfSizes_ = 0;
00140 
00141         throughput_ = 0.0;
00142         rate_ = 0.0;
00143         average_ = 0.0;
00144         rms_ = 0.0;
00145 
00146         nbSentLast_ = 0;
00147         sumOfSquaresLast_ = 0;
00148         sumOfSizesLast_ = 0;
00149 }
00150 
00151 //______________________________________________________________________________
00152 void SharedResources::cancelAllWorkloops() {
00153         if (wlSendData_) {
00154                 wlSendData_->cancel();
00155                 toolbox::task::getWorkLoopFactory()->removeWorkLoop("SendData",
00156                                 "waiting");
00157         }
00158         if (wlSendDqm_) {
00159                 wlSendDqm_->cancel();
00160                 toolbox::task::getWorkLoopFactory()->removeWorkLoop("SendDqm",
00161                                 "waiting");
00162         }
00163         if (wlDiscard_) {
00164                 wlDiscard_->cancel();
00165                 toolbox::task::getWorkLoopFactory()->removeWorkLoop("Discard",
00166                                 "waiting");
00167         }
00168 
00169         if (wlMonitoring_) {
00170                 wlMonitoring_->cancel();
00171                 toolbox::task::getWorkLoopFactory()->removeWorkLoop("Monitoring",
00172                                 "waiting");
00173         }
00174         if (wlWatching_) {
00175                 wlWatching_->cancel();
00176                 toolbox::task::getWorkLoopFactory()->removeWorkLoop("Watching",
00177                                 "waiting");
00178         }
00179 }
00180 
00181 //______________________________________________________________________________
00182 void SharedResources::startMonitoringWorkLoop() throw (evf::Exception) {
00183 
00184         struct timezone timezone;
00185         gettimeofday(&monStartTime_, &timezone);
00186 
00187         try {
00188                 wlMonitoring_ = toolbox::task::getWorkLoopFactory()->getWorkLoop(
00189                                 sourceId_ + "Monitoring", "waiting");
00190                 if (!wlMonitoring_->isActive())
00191                         wlMonitoring_->activate();
00192                 asMonitoring_ = toolbox::task::bind(this, &SharedResources::monitoring,
00193                                 sourceId_ + "Monitoring");
00194                 wlMonitoring_->submit(asMonitoring_);
00195         } catch (xcept::Exception& e) {
00196                 string msg = "Failed to start workloop 'Monitoring'.";
00197                 XCEPT_RETHROW(evf::Exception, msg, e);
00198         }
00199 }
00200 
00201 //______________________________________________________________________________
00202 bool SharedResources::monitoring(toolbox::task::WorkLoop*) {
00203 
00204         unsigned int nbSent;
00205         uint64_t sumOfSquares;
00206         unsigned int sumOfSizes;
00207         uint64_t deltaSumOfSquares;
00208 
00209         lock();
00210         if (0 == resourceStructure_) {
00211                 deltaT_.value_ = 0.0;
00212                 deltaN_.value_ = 0;
00213                 deltaSumOfSquares_.value_ = 0.0;
00214                 deltaSumOfSizes_.value_ = 0;
00215                 throughput_ = 0.0;
00216                 rate_ = 0.0;
00217                 average_ = 0.0;
00218                 rms_ = 0.0;
00219                 unlock();
00220                 return false;
00221         } else {
00222                 nbSent = resourceStructure_->nbSent();
00223                 sumOfSquares = resourceStructure_->sumOfSquares();
00224                 sumOfSizes = resourceStructure_->sumOfSizes();
00225         }
00226         unlock();
00227 
00228         struct timeval monEndTime;
00229         struct timezone timezone;
00230 
00231         gettimeofday(&monEndTime, &timezone);
00232 
00233         xdata::getInfoSpaceFactory()->lock();
00234         gui_->monInfoSpace()->lock();
00235 
00236         deltaT_.value_ = deltaT(&monStartTime_, &monEndTime);
00237         monStartTime_ = monEndTime;
00238 
00239         deltaN_.value_ = nbSent - nbSentLast_;
00240         nbSentLast_ = nbSent;
00241 
00242         deltaSumOfSquares = sumOfSquares - sumOfSquaresLast_;
00243         deltaSumOfSquares_.value_ = (double) deltaSumOfSquares;
00244         sumOfSquaresLast_ = sumOfSquares;
00245 
00246         deltaSumOfSizes_.value_ = sumOfSizes - sumOfSizesLast_;
00247         sumOfSizesLast_ = sumOfSizes;
00248 
00249         if (deltaT_.value_ != 0) {
00250                 throughput_ = deltaSumOfSizes_.value_ / deltaT_.value_;
00251                 rate_ = deltaN_.value_ / deltaT_.value_;
00252         } else {
00253                 throughput_ = 0.0;
00254                 rate_ = 0.0;
00255         }
00256 
00257         double meanOfSquares, mean, squareOfMean, variance;
00258 
00259         if (deltaN_.value_ != 0) {
00260                 meanOfSquares = deltaSumOfSquares_.value_ / ((double) (deltaN_.value_));
00261                 mean = ((double) (deltaSumOfSizes_.value_))
00262                                 / ((double) (deltaN_.value_));
00263                 squareOfMean = mean * mean;
00264                 variance = meanOfSquares - squareOfMean;
00265                 if (variance < 0.0)
00266                         variance = 0.0;
00267 
00268                 average_ = deltaSumOfSizes_.value_ / deltaN_.value_;
00269                 rms_ = std::sqrt(variance);
00270         } else {
00271                 average_ = 0.0;
00272                 rms_ = 0.0;
00273         }
00274 
00275         gui_->monInfoSpace()->unlock();
00276         xdata::getInfoSpaceFactory()->unlock();
00277 
00278 	::sleep(monSleepSec_.value_);
00279 
00280         return true;
00281 }
00282 
00283 //______________________________________________________________________________
00284 void SharedResources::startWatchingWorkLoop() throw (evf::Exception) {
00285         try {
00286                 wlWatching_ = toolbox::task::getWorkLoopFactory()->getWorkLoop(
00287                                 sourceId_ + "Watching", "waiting");
00288                 if (!wlWatching_->isActive())
00289                         wlWatching_->activate();
00290                 asWatching_ = toolbox::task::bind(this, &SharedResources::watching,
00291                                 sourceId_ + "Watching");
00292                 wlWatching_->submit(asWatching_);
00293         } catch (xcept::Exception& e) {
00294                 string msg = "Failed to start workloop 'Watching'.";
00295                 XCEPT_RETHROW(evf::Exception, msg, e);
00296         }
00297 }
00298 
00299 //______________________________________________________________________________
00300 bool SharedResources::watching(toolbox::task::WorkLoop*) {
00301         lock();
00302         if (0 == resourceStructure_) {
00303                 unlock();
00304                 return false;
00305         }
00306 
00307         vector<pid_t> evt_prcids;
00308         vector<UInt_t> evt_numbers;
00309         vector<time_t> evt_tstamps;
00310         try {
00311                 evt_prcids = resourceStructure_->cellPrcIds();
00312                 evt_numbers = resourceStructure_->cellEvtNumbers();
00313                 evt_tstamps = resourceStructure_->cellTimeStamps();
00314         } catch (evf::Exception& e) {
00315                 goToFailedState(e);
00316         }
00317 
00318         time_t tcurr = time(0);
00319         for (UInt_t i = 0; i < evt_tstamps.size(); i++) {
00320                 pid_t pid = evt_prcids[i];
00321                 UInt_t evt = evt_numbers[i];
00322                 time_t tstamp = evt_tstamps[i];
00323                 if (tstamp == 0)
00324                         continue;
00325                 double tdiff = difftime(tcurr, tstamp);
00326                 if (tdiff > timeOutSec_) {
00327                         if (processKillerEnabled_) {
00328                                 // UPDATED
00329                                 kill(pid, 9);
00330                                 nbTimeoutsWithEvent_++;
00331                         }
00332                         LOG4CPLUS_ERROR(
00333                                         log_,
00334                                         "evt " << evt << " under processing for more than "
00335                                                         << timeOutSec_ << "sec for process " << pid);
00336 
00337                 }
00338         }
00339 
00340         vector<pid_t> prcids;
00341         try {
00342                 prcids = resourceStructure_->clientPrcIds();
00343                 for (UInt_t i = 0; i < prcids.size(); i++) {
00344                         pid_t pid = prcids[i];
00345                         int status = kill(pid, 0);
00346                         if (status != 0) {
00347                                 LOG4CPLUS_ERROR(
00348                                                 log_,
00349                                                 "EP prc " << pid
00350                                                                 << " died, send to error stream if processing.");
00351                                 if (!resourceStructure_->handleCrashedEP(runNumber_, pid))
00352                                         nbTimeoutsWithoutEvent_++;
00353                         }
00354                 }
00355         } catch (evf::Exception& e) {
00356                 goToFailedState(e);
00357         }
00358 
00359         try {
00360                 if ((resourceStructure_->nbResources() != nbRawCells_.value_)
00361                                 && !shmInconsistent_) {
00362                         std::ostringstream ost;
00363                         ost << "Watchdog spotted inconsistency in ResourceTable - nbRaw="
00364                                         << nbRawCells_.value_ << " but nbResources="
00365                                         << resourceStructure_->nbResources() << " and nbFreeSlots="
00366                                         << resourceStructure_->nbFreeSlots();
00367                         XCEPT_DECLARE(evf::Exception, sentinelException, ost.str());
00368                         fsm_->getApp()->notifyQualified("error", sentinelException);
00369 
00370                         // XXX shmInconsistent
00371                         shmInconsistent_ = true;
00372                 }
00373         } catch (evf::Exception& e) {
00374                 goToFailedState(e);
00375         }
00376 
00377         unlock();
00378 
00379 	::sleep(watchSleepSec_.value_);
00380         return true;
00381 }
00382 
00383 //______________________________________________________________________________
00384 double SharedResources::deltaT(const struct timeval *start,
00385                 const struct timeval *end) {
00386         unsigned int sec;
00387         unsigned int usec;
00388 
00389         sec = end->tv_sec - start->tv_sec;
00390 
00391         if (end->tv_usec > start->tv_usec) {
00392                 usec = end->tv_usec - start->tv_usec;
00393         } else {
00394                 sec--;
00395                 usec = 1000000 - ((unsigned int) (start->tv_usec - end->tv_usec));
00396         }
00397 
00398         return ((double) sec) + ((double) usec) / 1000000.0;
00399 }
00400 
00401 // sendData workloop STARTER
00402 //______________________________________________________________________________
00403 void SharedResources::startSendDataWorkLoop() throw (evf::Exception) {
00404         try {
00405                 LOG4CPLUS_INFO(log_, "Start 'send data' workloop.");
00406                 wlSendData_ = toolbox::task::getWorkLoopFactory()->getWorkLoop(
00407                                 "SendData", "waiting");
00408                 if (!wlSendData_->isActive())
00409                         wlSendData_->activate();
00410                 asSendData_ = toolbox::task::bind(this, &SharedResources::sendData,
00411                                 "SendData");
00412                 wlSendData_->submit(asSendData_);
00413 
00414         } catch (xcept::Exception& e) {
00415                 string msg = "Failed to start workloop 'SendData'.";
00416                 XCEPT_RETHROW(evf::Exception, msg, e);
00417         }
00418 }
00419 
00420 //  sendData workloop DISPATCHING SIGNATURE
00421 bool SharedResources::sendData(toolbox::task::WorkLoop*) {
00422         int currentStateID = -1;
00423         bool reschedule = true;
00424 
00425         fsm_->transitionReadLock();
00426         currentStateID = fsm_->getCurrentState().stateID();
00427         fsm_->transitionUnlock();
00428 
00429         try {
00430                 switch (currentStateID) {
00431                 case rb_statemachine::RUNNING:
00432                         reschedule = resourceStructure_->sendData();
00433                         break;
00434                 case rb_statemachine::STOPPING:
00435                         reschedule = resourceStructure_->sendData();
00436                         break;
00437                 case rb_statemachine::HALTING:
00438                         reschedule = resourceStructure_->sendDataWhileHalting();
00439                         break;
00440                 case rb_statemachine::FAILED:
00441                         // workloop must be exited in this state
00442                         return false;
00443                 default:
00444                         cout << "RBStateMachine: current state: " << currentStateID
00445                                         << " does not support action: >>sendData<<" << endl;
00446                         ::usleep(50000);
00447                         reschedule = true;
00448                 }
00449         } catch (evf::Exception& e) {
00450                 goToFailedState(e);
00451         }
00452 
00453         return reschedule;
00454 }
00455 
00456 // sendDqm workloop STARTER
00457 //______________________________________________________________________________
00458 void SharedResources::startSendDqmWorkLoop() throw (evf::Exception) {
00459         try {
00460                 LOG4CPLUS_INFO(log_, "Start 'send dqm' workloop.");
00461                 wlSendDqm_ = toolbox::task::getWorkLoopFactory()->getWorkLoop(
00462                                 "SendDqm", "waiting");
00463                 if (!wlSendDqm_->isActive())
00464                         wlSendDqm_->activate();
00465                 asSendDqm_ = toolbox::task::bind(this, &SharedResources::sendDqm,
00466                                 "SendDqm");
00467                 wlSendDqm_->submit(asSendDqm_);
00468 
00469         } catch (xcept::Exception& e) {
00470                 string msg = "Failed to start workloop 'SendDqm'.";
00471                 XCEPT_RETHROW(evf::Exception, msg, e);
00472         }
00473 }
00474 
00475 //  sendDqm workloop DISPATCHING SIGNATURE
00476 bool SharedResources::sendDqm(toolbox::task::WorkLoop*) {
00477         int currentStateID = -1;
00478         bool reschedule = true;
00479 
00480         fsm_->transitionReadLock();
00481         currentStateID = fsm_->getCurrentState().stateID();
00482         fsm_->transitionUnlock();
00483 
00484         try {
00485                 switch (currentStateID) {
00486                 case rb_statemachine::RUNNING:
00487                         reschedule = resourceStructure_->sendDqm();
00488                         break;
00489                 case rb_statemachine::STOPPING:
00490                         reschedule = resourceStructure_->sendDqm();
00491                         break;
00492                 case rb_statemachine::HALTING:
00493                         reschedule = resourceStructure_->sendDqmWhileHalting();
00494                         break;
00495                 case rb_statemachine::FAILED:
00496                         // workloop must be exited in this state
00497                         return false;
00498                 default:
00499                         cout << "RBStateMachine: current state: " << currentStateID
00500                                         << " does not support action: >>sendDqm<<" << endl;
00501                         ::usleep(50000);
00502                         reschedule = true;
00503                 }
00504         } catch (evf::Exception& e) {
00505                 goToFailedState(e);
00506         }
00507 
00508         return reschedule;
00509 }
00510 
00511 // discard workloop STARTER
00512 //______________________________________________________________________________
00513 void SharedResources::startDiscardWorkLoop() throw (evf::Exception) {
00514         try {
00515                 LOG4CPLUS_INFO(log_, "Start 'discard' workloop.");
00516                 wlDiscard_ = toolbox::task::getWorkLoopFactory()->getWorkLoop(
00517                                 "Discard", "waiting");
00518                 if (!wlDiscard_->isActive())
00519                         wlDiscard_->activate();
00520                 asDiscard_ = toolbox::task::bind(this, &SharedResources::discard,
00521                                 "Discard");
00522                 wlDiscard_->submit(asDiscard_);
00523                 resourceStructure_->setActive(true);
00524 
00525         } catch (xcept::Exception& e) {
00526                 string msg = "Failed to start workloop 'Discard'.";
00527                 XCEPT_RETHROW(evf::Exception, msg, e);
00528         }
00529         resourceStructure_->setReadyToShutDown(false);
00530 }
00531 
00532 //  discard workloop DISPATCHING SIGNATURE
00533 bool SharedResources::discard(toolbox::task::WorkLoop*) {
00534         int currentStateID = -1;
00535         bool reschedule = true;
00536 
00537         fsm_->transitionReadLock();
00538         currentStateID = fsm_->getCurrentState().stateID();
00539         fsm_->transitionUnlock();
00540         try {
00541                 switch (currentStateID) {
00542                 case rb_statemachine::RUNNING:
00543                         reschedule = resourceStructure_->discard();
00544                         break;
00545                 case rb_statemachine::STOPPING:
00546                         // XXX: communication with BU after stop!
00547                         reschedule = resourceStructure_->discardWhileHalting(true);
00548                         break;
00549                 case rb_statemachine::HALTING:
00550                         // XXX: no more communication with BU after halt!
00551                         reschedule = resourceStructure_->discardWhileHalting(false);
00552                         break;
00553                 case rb_statemachine::FAILED:
00554                         // workloop must be exited in this state
00555                         return false;
00556                 default:
00557                         cout << "RBStateMachine: current state: " << currentStateID
00558                                         << " does not support action: >>discard<<" << endl;
00559                         ::usleep(50000);
00560                         reschedule = true;
00561                 }
00562         } catch (evf::Exception& e) {
00563                 goToFailedState(e);
00564         }
00565 
00566         return reschedule;
00567 }
00568 
00569 //______________________________________________________________________________
00570 void SharedResources::printWorkLoopStatus() {
00571         cout << "Workloop status===============" << endl;
00572         cout << "==============================" << endl;
00573         if (wlSendData_ != 0)
00574                 cout << "SendData -> " << wlSendData_->isActive() << endl;
00575         if (wlSendDqm_ != 0)
00576                 cout << "SendDqm  -> " << wlSendDqm_->isActive() << endl;
00577         if (wlDiscard_ != 0)
00578                 cout << "Discard  -> " << wlDiscard_->isActive() << endl;
00579         //cout << "Workloops Active  -> " << isActive_ << endl;
00580 }
00581 
00582 //______________________________________________________________________________
00583 void SharedResources::goToFailedState(evf::Exception& exception) {
00584         reasonForFailed_ = exception.what();
00585         LOG4CPLUS_FATAL(log_,
00586                         "Moving to FAILED state! Reason: " << exception.what());
00587         EventPtr fail(new Fail());
00588         commands_.enqEvent(fail);
00589 }