22 using namespace evf::rb_statemachine;
42 resourceStructure_(0),
46 deltaSumOfSquares_(0),
52 nbAllocatedEvents_(0),
53 nbPendingRequests_(0),
57 nbSentErrorEvents_(0),
58 nbPendingSMDiscards_(0),
59 nbPendingSMDqmDiscards_(0),
60 nbDiscardedEvents_(0),
63 highestEolReceived_(0),
69 nbTimeoutsWithEvent_(0),
70 nbTimeoutsWithoutEvent_(0),
72 segmentationMode_(
false),
73 useMessageQueueIPC_(
false),
79 rawCellSize_(0x400000)
81 recoCellSize_(0x800000)
83 dqmCellSize_(0x800000)
85 , freeResRequiredForAllocate_(-1), doDropEvents_(
false),
86 doFedIdCheck_(
true), doCrcCheck_(1), doDumpEvents_(0),
87 buClassName_(
"BU"), buInstance_(0), smClassName_(
"StorageManager"),
88 smInstance_(0), resourceStructureTimeout_(200000), monSleepSec_(2),
89 watchSleepSec_(10), timeOutSec_(30), processKillerEnabled_(
true),
90 useEvmBoard_(
true), reasonForFailed_(
""), nbAllocateSent_(0),
91 nbTakeReceived_(0), nbDataDiscardReceived_(0),
92 nbDqmDiscardReceived_(0), nbSentLast_(0), sumOfSquaresLast_(0),
93 sumOfSizesLast_(0), frb_(0), shmInconsistent_(
false),
94 allowI2ODiscards_(
true) {
96 sem_init(&
lock_, 0, 1);
155 toolbox::task::getWorkLoopFactory()->removeWorkLoop(
"SendData",
160 toolbox::task::getWorkLoopFactory()->removeWorkLoop(
"SendDqm",
165 toolbox::task::getWorkLoopFactory()->removeWorkLoop(
"Discard",
171 toolbox::task::getWorkLoopFactory()->removeWorkLoop(
"Monitoring",
176 toolbox::task::getWorkLoopFactory()->removeWorkLoop(
"Watching",
184 struct timezone timezone;
188 wlMonitoring_ = toolbox::task::getWorkLoopFactory()->getWorkLoop(
196 string msg =
"Failed to start workloop 'Monitoring'.";
206 unsigned int sumOfSizes;
228 struct timeval monEndTime;
229 struct timezone timezone;
231 gettimeofday(&monEndTime, &timezone);
233 xdata::getInfoSpaceFactory()->lock();
240 nbSentLast_ = nbSent;
244 sumOfSquaresLast_ = sumOfSquares;
247 sumOfSizesLast_ = sumOfSizes;
257 double meanOfSquares,
mean, squareOfMean, variance;
263 squareOfMean = mean *
mean;
264 variance = meanOfSquares - squareOfMean;
276 xdata::getInfoSpaceFactory()->unlock();
286 wlWatching_ = toolbox::task::getWorkLoopFactory()->getWorkLoop(
294 string msg =
"Failed to start workloop 'Watching'.";
307 vector<pid_t> evt_prcids;
308 vector<UInt_t> evt_numbers;
309 vector<time_t> evt_tstamps;
318 time_t tcurr =
time(0);
319 for (
UInt_t i = 0;
i < evt_tstamps.size();
i++) {
320 pid_t
pid = evt_prcids[
i];
322 time_t tstamp = evt_tstamps[
i];
325 double tdiff = difftime(tcurr, tstamp);
334 "evt " << evt <<
" under processing for more than "
340 vector<pid_t> prcids;
346 for (
UInt_t i = 0;
i < prcids.size();
i++) {
347 pid_t
pid = prcids[
i];
348 int status = kill(pid, 0);
353 <<
" died, send to error stream if processing.");
366 std::ostringstream ost;
367 ost <<
"Watchdog spotted inconsistency in ResourceTable - nbRaw="
372 fsm_->
getApp()->notifyQualified(
"error", sentinelException);
389 const struct timeval *
end) {
393 sec = end->tv_sec - start->tv_sec;
395 if (end->tv_usec > start->tv_usec) {
396 usec = end->tv_usec - start->tv_usec;
399 usec = 1000000 - ((
unsigned int) (start->tv_usec - end->tv_usec));
402 return ((
double) sec) + ((
double) usec) / 1000000.0;
409 LOG4CPLUS_INFO(
log_,
"Start 'send data' workloop.");
410 wlSendData_ = toolbox::task::getWorkLoopFactory()->getWorkLoop(
411 "SendData",
"waiting");
419 string msg =
"Failed to start workloop 'SendData'.";
426 int currentStateID = -1;
427 bool reschedule =
true;
434 switch (currentStateID) {
448 cout <<
"RBStateMachine: current state: " << currentStateID
449 <<
" does not support action: >>sendData<<" << endl;
464 LOG4CPLUS_INFO(
log_,
"Start 'send dqm' workloop.");
465 wlSendDqm_ = toolbox::task::getWorkLoopFactory()->getWorkLoop(
466 "SendDqm",
"waiting");
474 string msg =
"Failed to start workloop 'SendDqm'.";
481 int currentStateID = -1;
482 bool reschedule =
true;
489 switch (currentStateID) {
503 cout <<
"RBStateMachine: current state: " << currentStateID
504 <<
" does not support action: >>sendDqm<<" << endl;
519 LOG4CPLUS_INFO(
log_,
"Start 'discard' workloop.");
520 wlDiscard_ = toolbox::task::getWorkLoopFactory()->getWorkLoop(
521 "Discard",
"waiting");
530 string msg =
"Failed to start workloop 'Discard'.";
538 int currentStateID = -1;
539 bool reschedule =
true;
545 switch (currentStateID) {
561 cout <<
"RBStateMachine: current state: " << currentStateID
562 <<
" does not support action: >>discard<<" << endl;
575 cout <<
"Workloop status===============" << endl;
576 cout <<
"==============================" << endl;
589 LOG4CPLUS_FATAL(
log_,
590 "Moving to FAILED state! Reason: " << exception.what());
virtual bool sendDqmWhileHalting()=0
void enqEvent(EventPtr event)
static const char runNumber_[]
virtual std::vector< pid_t > clientPrcIds() const =0
void cancelAllWorkloops()
double deltaT(const struct timeval *start, const struct timeval *end)
ActionSignature_t * asDiscard_
ActionSignature_t * asWatching_
tuple start
Check for commandline option errors.
ActionSignature_t * asSendData_
virtual int stateID() const =0
void transitionReadLock()
xdata::Integer freeResRequiredForAllocate_
xdata::UnsignedInteger32 recoCellSize_
bool discard(toolbox::task::WorkLoop *wl)
virtual std::vector< time_t > cellTimeStamps() const =0
void setDoCrcCheck(UInt_t doCrcCheck)
sem_t accessToResourceStructureLock_
xdata::UnsignedInteger32 nbRawCells_
void setActive(bool activeValue)
virtual std::vector< UInt_t > cellEvtNumbers() const =0
xdata::UnsignedInteger32 deltaN_
virtual bool discardWhileHalting(bool sendDiscards)=0
xdata::Double deltaSumOfSquares_
boost::shared_ptr< boost::statechart::event_base > EventPtr
xdata::UnsignedInteger32 resourceStructureTimeout_
static void useEvmBoard(bool useEvmBoard)
xdata::UnsignedInteger32 nbTimeoutsWithEvent_
xdata::UnsignedInteger32 doDumpEvents_
xdata::Boolean segmentationMode_
uint64_t sumOfSquaresLast_
bool sendData(toolbox::task::WorkLoop *wl)
WorkLoop_t * wlMonitoring_
xdata::UnsignedInteger32 nbDqmCells_
xdata::UnsignedInteger32 monSleepSec_
bool sendDqm(toolbox::task::WorkLoop *wl)
bool watching(toolbox::task::WorkLoop *wl)
UInt_t sumOfSizes() const
xdata::UnsignedInteger32 deltaSumOfSizes_
ActionSignature_t * asMonitoring_
ActionSignature_t * asSendDqm_
void startSendDqmWorkLoop()
xdata::Double throughput_
void printWorkLoopStatus()
xdata::String reasonForFailed_
void startDiscardWorkLoop()
void startSendDataWorkLoop()
void setDoDumpEvents(UInt_t doDumpEvents)
virtual bool handleCrashedEP(UInt_t runNumber, pid_t pid)=0
uint64_t sumOfSquares() const
xdata::UnsignedInteger32 watchSleepSec_
UInt_t nbFreeSlots() const
xdata::Boolean useEvmBoard_
unsigned long long uint64_t
IPCMethod * resourceStructure_
xdata::UnsignedInteger32 nbRecoCells_
xdata::UnsignedInteger32 runNumber_
xdata::Boolean doFedIdCheck_
bool monitoring(toolbox::task::WorkLoop *wl)
virtual UInt_t nbResources() const =0
xdata::Boolean processKillerEnabled_
void initialise(bool segmentationMode, UInt_t nbRawCells, UInt_t nbRecoCells, UInt_t nbDqmCells, UInt_t rawCellSize, UInt_t recoCellSize, UInt_t dqmCellSize, int freeResRequiredForAllocate, BUProxy *bu, SMProxy *sm, log4cplus::Logger logger, unsigned int resourceStructureTimeout, EvffedFillerRB *frb, xdaq::Application *)
void startWatchingWorkLoop()
xdata::InfoSpace * monInfoSpace()
xdaq::Application * getApp() const
xdata::UnsignedInteger32 nbTimeoutsWithoutEvent_
BaseState const & getCurrentState() const
void startMonitoringWorkLoop()
virtual bool sendData()=0
xdata::UnsignedInteger32 doCrcCheck_
xdata::Boolean useMessageQueueIPC_
static void doFedIdCheck(bool doFedIdCheck)
SharedResources(Logger log)
struct timeval monStartTime_
virtual std::vector< pid_t > cellPrcIds() const =0
xdata::UnsignedInteger32 rawCellSize_
xdata::UnsignedInteger32 dqmCellSize_
void goToFailedState(evf::Exception &e)
void setReadyToShutDown(bool readyValue)
virtual bool sendDataWhileHalting()=0
void configureResources(xdaq::Application *app)
xdata::UnsignedInteger32 timeOutSec_