CMS 3D CMS Logo

/afs/cern.ch/work/a/aaltunda/public/www/CMSSW_6_2_5/src/EventFilter/ResourceBroker/src/Stopping.cc

Go to the documentation of this file.
00001 
00006 #include "EventFilter/ResourceBroker/interface/RBStateMachine.h"
00007 //#include "EventFilter/ResourceBroker/interface/IPCMethod.h"
00008 #include "EventFilter/ResourceBroker/interface/SharedResources.h"
00009 
00010 #include <iostream>
00011 #include <vector>
00012 #include <sstream>
00013 
00014 using std::cout;
00015 using std::endl;
00016 using std::vector;
00017 using std::string;
00018 using std::ostringstream;
00019 using namespace evf::rb_statemachine;
00020 
00021 // entry action, state notification, state action
00022 //______________________________________________________________________________
00023 void Stopping::do_entryActionWork() {
00024 }
00025 
00026 void Stopping::do_stateNotify() {
00027         SharedResourcesPtr_t res = outermost_context().getSharedResources();
00028         LOG4CPLUS_INFO(res->log_, "--> ResourceBroker: NEW STATE: " << stateName());
00029         outermost_context().setExternallyVisibleState(stateName());
00030         outermost_context().setInternalStateName(stateName());
00031         // RCMS notification no longer required here
00032         // this is done in FUResourceBroker in SOAP reply
00033         //outermost_context().rcmsStateChangeNotify();
00034 }
00035 
00036 void Stopping::do_stateAction() const {
00037         SharedResourcesPtr_t res = outermost_context().getSharedResources();
00038 
00039         try {
00040                 LOG4CPLUS_INFO(res->log_, "Start stopping :) ...");
00041                 res->resourceStructure_->setStopFlag(true);
00042                 res->resourceStructure_->shutDownClients();
00043                 timeval now;
00044                 timeval then;
00045                 gettimeofday(&then, 0);
00046                 while (!res->resourceStructure_->isReadyToShutDown()) {
00047                         ::usleep(res->resourceStructureTimeout_.value_ * 10);
00048                         gettimeofday(&now, 0);
00049                         if ((unsigned int) (now.tv_sec - then.tv_sec)
00050                                         > res->resourceStructureTimeout_.value_ / 10000) {
00051                                 cout << "times: " << now.tv_sec << " " << then.tv_sec << " "
00052                                                 << res->resourceStructureTimeout_.value_ / 10000
00053                                                 << endl;
00054                                 LOG4CPLUS_WARN(res->log_,
00055                                                 "Some Process did not detach - going to Emergency stop! resource status:" 
00056                                                 << res->resourceStructure_->printStatus() );
00057 
00062                                 //try to acquire RS lock
00063                                 int count=5;
00064                                 do {
00065                                   if (!res->tryLockRSAccess()) break;
00066                                   usleep(100000);
00067                                 } while (--count);
00068                                 if (!count) XCEPT_RAISE(evf::Exception,"Can not acquire RS lock for the emergency stop!");
00069 
00070                                 emergencyStop();
00071 
00072                                 res->unlockRSAccess();
00073 
00074                                 break;
00075                         }
00076                 }
00077 
00078                 if (res->resourceStructure_->isReadyToShutDown()) {
00079                         // lock access to I2O discards (data & dqm)
00080                         res->lockRSAccess();
00081 
00082                         // if emergency stop was not triggered
00083                         if (res->allowI2ODiscards_) {
00084                                 // any I2O discards after this point are ignored
00085                                 res->allowI2ODiscards_ = false;
00086                                 // UPDATED: release resources
00087                                 res->resourceStructure_->releaseResources();
00088                                 // UPDATED: forget pending allocates to BU
00089                                 res->resourceStructure_->resetPendingAllocates();
00090                                 // UPDATE: reset the underlying IPC method
00091                                 res->resourceStructure_->resetIPC();
00092                         }
00093 
00094                         res->unlockRSAccess();
00095 
00096                         LOG4CPLUS_INFO(res->log_, "Finished stopping!");
00097                         EventPtr stopDone(new StopDone());
00098                         res->commands_.enqEvent(stopDone);
00099                 }
00100         } catch (xcept::Exception &e) {
00101                 moveToFailedState(e);
00102         }
00103 }
00104 
00105 /*
00106  * I2O capability
00107  */
00108 bool Stopping::discardDataEvent(MemRef_t* bufRef) const {
00109         SharedResourcesPtr_t res = outermost_context().getSharedResources();
00110         bool returnValue = false;
00111         try {
00112                 returnValue = res->resourceStructure_->discardDataEvent(bufRef);
00113         } catch (evf::Exception& e) {
00114                 moveToFailedState(e);
00115         }
00116         return returnValue;
00117 }
00118 bool Stopping::discardDqmEvent(MemRef_t* bufRef) const {
00119         SharedResourcesPtr_t res = outermost_context().getSharedResources();
00120         bool returnValue = false;
00121         try {
00122                 returnValue = res->resourceStructure_->discardDqmEvent(bufRef);
00123                 //returnValue = res->resourceStructure_->discardDqmEventWhileHalting(bufRef);
00124         } catch (evf::Exception& e) {
00125                 moveToFailedState(e);
00126         }
00127         return returnValue;
00128 }
00129 
00130 // construction / destruction
00131 //______________________________________________________________________________
00132 Stopping::Stopping(my_context c) :
00133         my_base(c) {
00134         safeEntryAction();
00135 }
00136 
00137 Stopping::~Stopping() {
00138         safeExitAction();
00139 }
00140 
00141 void Stopping::emergencyStop() const {
00142         SharedResourcesPtr_t res = outermost_context().getSharedResources();
00143         IPCMethod* resourceStructure = res->resourceStructure_;
00144 
00145         LOG4CPLUS_WARN(res->log_, "in Emergency stop - handle non-clean stops");
00146 
00147         // UPDATE: while in emergency stop I2O discards from SM are not allowed
00148         // they are re-allowed after a new enable
00149         res->allowI2ODiscards_ = false;
00150         {
00151                 #ifdef linux
00152                 auto lk = resourceStructure->lockCrashHandlerTimed(10);
00153                 #else
00154                 bool lk=true;
00155                 #endif
00156                 if (lk) { 
00157                         vector < pid_t > client_prc_ids = resourceStructure->clientPrcIds();
00158                         for (UInt_t i = 0; i < client_prc_ids.size(); i++) {
00159                                 pid_t pid = client_prc_ids[i];
00160                                 cout << "B: killing process " << i << " pid= " << pid << endl;
00161                                 if (pid != 0) {
00162                                         //assume processes are dead by now
00163                                         if (!resourceStructure->handleCrashedEP(res->runNumber_, pid))
00164                                                 res->nbTimeoutsWithoutEvent_++;
00165                                         else
00166                                                 res->nbTimeoutsWithEvent_++;
00167                                 }
00168                         }
00169                 }
00170                 else {
00171                   XCEPT_RAISE(evf::Exception, 
00172                         "Timed out accessing the EP Crash Handler in emergency stop. SM discards not arriving?");
00173                 }
00174         }
00175         LOG4CPLUS_WARN(res->log_, "in Emergency stop - running lastResort");
00176         resourceStructure->lastResort();
00177 	::sleep(1);
00178         if (!resourceStructure->isReadyToShutDown()) {
00179                 UInt_t shutdownStatus = resourceStructure->shutdownStatus();
00180                 std::ostringstream ostr;
00181                 ostr << "EmergencyStop: failed to shut down ResourceTable. Debug info mask:" << std::hex <<  shutdownStatus;
00182                 res->reasonForFailed_ = ostr.str();
00183                 XCEPT_RAISE(evf::Exception, res->reasonForFailed_);
00184         }
00185 
00186         res->printWorkLoopStatus();
00187         res->lock();
00188 
00189         LOG4CPLUS_WARN(res->log_, "Deleting the resource structure!");
00190         delete res->resourceStructure_;
00191         res->resourceStructure_ = 0;
00192 
00193         cout << "cycle through resourcetable config " << endl;
00194         res->configureResources(outermost_context().getApp());
00195         res->unlock();
00196         if (res->shmInconsistent_)
00197                 XCEPT_RAISE(evf::Exception, "Inconsistent shm state");
00198         cout << "done with emergency stop" << endl;
00199 }
00200 
00201 // exit action, state name, move to failed state
00202 //______________________________________________________________________________
00203 void Stopping::do_exitActionWork() {
00204 }
00205 
00206 string Stopping::do_stateName() const {
00207         return std::string("Stopping");
00208 }
00209 
00210 void Stopping::do_moveToFailedState(xcept::Exception& exception) const {
00211         SharedResourcesPtr_t res = outermost_context().getSharedResources();
00212         res->reasonForFailed_ = exception.what();
00213         LOG4CPLUS_FATAL(res->log_,
00214                         "Moving to FAILED state! Reason: " << exception.what());
00215         EventPtr fail(new Fail());
00216         res->commands_.enqEvent(fail);
00217 }