CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
SharedResources.cc
Go to the documentation of this file.
1 //
3 // SharedResources.h
4 // -------
5 //
6 // Resources shared between FSM states.
7 //
8 // Created on: Sep 21, 2011
9 // Andrei Spataru : aspataru@cern.ch
11 
13 
14 #include <signal.h>
15 #include <iostream>
16 
17 using std::string;
18 using std::vector;
19 using std::cout;
20 using std::endl;
21 
22 using namespace evf::rb_statemachine;
23 
25  wlMonitoring_(0),
26  asMonitoring_(0),
27  wlWatching_(0),
28  asWatching_(0),
29  wlSendData_(0),
30  asSendData_(0),
31  wlSendDqm_(0),
32  asSendDqm_(0),
33  wlDiscard_(0),
34  asDiscard_(0),
35  gui_(0),
36  commands_(CommandQueue()),
37  log_(log),
38  bu_(0),
39  sm_(0),
40  i2oPool_(0),
41  ipcManager_(0),
42  resourceStructure_(0),
43  runNumber_(0),
44  deltaT_(0.0),
45  deltaN_(0),
46  deltaSumOfSquares_(0),
47  deltaSumOfSizes_(0),
48  throughput_(0.0),
49  rate_(0.0),
50  average_(0.0),
51  rms_(0.0),
52  nbAllocatedEvents_(0),
53  nbPendingRequests_(0),
54  nbReceivedEvents_(0),
55  nbSentEvents_(0),
56  nbSentDqmEvents_(0),
57  nbSentErrorEvents_(0),
58  nbPendingSMDiscards_(0),
59  nbPendingSMDqmDiscards_(0),
60  nbDiscardedEvents_(0),
61  // UPDATED
62  nbReceivedEol_(0),
63  highestEolReceived_(0),
64  nbEolPosted_(0),
65  nbEolDiscarded_(0),
66  nbLostEvents_(0),
67  nbDataErrors_(0),
68  nbCrcErrors_(0),
69  nbTimeoutsWithEvent_(0),
70  nbTimeoutsWithoutEvent_(0),
71  dataErrorFlag_(0),
72  segmentationMode_(false),
73  useMessageQueueIPC_(false),
74  nbClients_(0),
75  clientPrcIds_(""),
76  nbRawCells_(16),
77  nbRecoCells_(8),
78  nbDqmCells_(8),
79  rawCellSize_(0x400000) // 4MB
80  ,
81  recoCellSize_(0x800000) // 8MB
82  ,
83  dqmCellSize_(0x800000) // 8MB
84  // at least nbRawCells / 2 free resources to send allocate
85  , freeResRequiredForAllocate_(-1), doDropEvents_(false),
86  doFedIdCheck_(true), doCrcCheck_(1), doDumpEvents_(0),
87  buClassName_("BU"), buInstance_(0), smClassName_("StorageManager"),
88  smInstance_(0), resourceStructureTimeout_(200000), monSleepSec_(2),
89  watchSleepSec_(10), timeOutSec_(30), processKillerEnabled_(true),
90  useEvmBoard_(true), reasonForFailed_(""), nbAllocateSent_(0),
91  nbTakeReceived_(0), nbDataDiscardReceived_(0),
92  nbDqmDiscardReceived_(0), nbSentLast_(0), sumOfSquaresLast_(0),
93  sumOfSizesLast_(0), frb_(0), shmInconsistent_(false),
94  allowI2ODiscards_(true) {
95 
96  sem_init(&lock_, 0, 1);
97  sem_init(&accessToResourceStructureLock_, 0, 1);
98 
99 }
100 
102 
103 }
104 
105 //______________________________________________________________________________
106 void SharedResources::configureResources(xdaq::Application* app) {
107 
109 
111  nbRecoCells_.value_, nbDqmCells_.value_, rawCellSize_.value_,
112  recoCellSize_.value_, dqmCellSize_.value_,
114  resourceStructureTimeout_.value_, frb_, app);
115 
117 
122  reset();
123  shmInconsistent_ = false;
124 
125  // XXX shmInconsistent check
128  shmInconsistent_ = true;
129 }
130 
131 //______________________________________________________________________________
133 
134  gui_->resetCounters();
135 
136  deltaT_ = 0.0;
137  deltaN_ = 0;
138  deltaSumOfSquares_ = 0.0;
139  deltaSumOfSizes_ = 0;
140 
141  throughput_ = 0.0;
142  rate_ = 0.0;
143  average_ = 0.0;
144  rms_ = 0.0;
145 
146  nbSentLast_ = 0;
147  sumOfSquaresLast_ = 0;
148  sumOfSizesLast_ = 0;
149 }
150 
151 //______________________________________________________________________________
153  if (wlSendData_) {
154  wlSendData_->cancel();
155  toolbox::task::getWorkLoopFactory()->removeWorkLoop("SendData",
156  "waiting");
157  }
158  if (wlSendDqm_) {
159  wlSendDqm_->cancel();
160  toolbox::task::getWorkLoopFactory()->removeWorkLoop("SendDqm",
161  "waiting");
162  }
163  if (wlDiscard_) {
164  wlDiscard_->cancel();
165  toolbox::task::getWorkLoopFactory()->removeWorkLoop("Discard",
166  "waiting");
167  }
168 
169  if (wlMonitoring_) {
170  wlMonitoring_->cancel();
171  toolbox::task::getWorkLoopFactory()->removeWorkLoop("Monitoring",
172  "waiting");
173  }
174  if (wlWatching_) {
175  wlWatching_->cancel();
176  toolbox::task::getWorkLoopFactory()->removeWorkLoop("Watching",
177  "waiting");
178  }
179 }
180 
181 //______________________________________________________________________________
183 
184  struct timezone timezone;
185  gettimeofday(&monStartTime_, &timezone);
186 
187  try {
188  wlMonitoring_ = toolbox::task::getWorkLoopFactory()->getWorkLoop(
189  sourceId_ + "Monitoring", "waiting");
190  if (!wlMonitoring_->isActive())
191  wlMonitoring_->activate();
192  asMonitoring_ = toolbox::task::bind(this, &SharedResources::monitoring,
193  sourceId_ + "Monitoring");
194  wlMonitoring_->submit(asMonitoring_);
195  } catch (xcept::Exception& e) {
196  string msg = "Failed to start workloop 'Monitoring'.";
197  XCEPT_RETHROW(evf::Exception, msg, e);
198  }
199 }
200 
201 //______________________________________________________________________________
202 bool SharedResources::monitoring(toolbox::task::WorkLoop*) {
203 
204  unsigned int nbSent;
205  uint64_t sumOfSquares;
206  unsigned int sumOfSizes;
207  uint64_t deltaSumOfSquares;
208 
209  lock();
210  if (0 == resourceStructure_) {
211  deltaT_.value_ = 0.0;
212  deltaN_.value_ = 0;
213  deltaSumOfSquares_.value_ = 0.0;
214  deltaSumOfSizes_.value_ = 0;
215  throughput_ = 0.0;
216  rate_ = 0.0;
217  average_ = 0.0;
218  rms_ = 0.0;
219  unlock();
220  return false;
221  } else {
222  nbSent = resourceStructure_->nbSent();
223  sumOfSquares = resourceStructure_->sumOfSquares();
224  sumOfSizes = resourceStructure_->sumOfSizes();
225  }
226  unlock();
227 
228  struct timeval monEndTime;
229  struct timezone timezone;
230 
231  gettimeofday(&monEndTime, &timezone);
232 
233  xdata::getInfoSpaceFactory()->lock();
234  gui_->monInfoSpace()->lock();
235 
236  deltaT_.value_ = deltaT(&monStartTime_, &monEndTime);
237  monStartTime_ = monEndTime;
238 
239  deltaN_.value_ = nbSent - nbSentLast_;
240  nbSentLast_ = nbSent;
241 
242  deltaSumOfSquares = sumOfSquares - sumOfSquaresLast_;
243  deltaSumOfSquares_.value_ = (double) deltaSumOfSquares;
244  sumOfSquaresLast_ = sumOfSquares;
245 
246  deltaSumOfSizes_.value_ = sumOfSizes - sumOfSizesLast_;
247  sumOfSizesLast_ = sumOfSizes;
248 
249  if (deltaT_.value_ != 0) {
250  throughput_ = deltaSumOfSizes_.value_ / deltaT_.value_;
251  rate_ = deltaN_.value_ / deltaT_.value_;
252  } else {
253  throughput_ = 0.0;
254  rate_ = 0.0;
255  }
256 
257  double meanOfSquares, mean, squareOfMean, variance;
258 
259  if (deltaN_.value_ != 0) {
260  meanOfSquares = deltaSumOfSquares_.value_ / ((double) (deltaN_.value_));
261  mean = ((double) (deltaSumOfSizes_.value_))
262  / ((double) (deltaN_.value_));
263  squareOfMean = mean * mean;
264  variance = meanOfSquares - squareOfMean;
265  if (variance < 0.0)
266  variance = 0.0;
267 
268  average_ = deltaSumOfSizes_.value_ / deltaN_.value_;
269  rms_ = std::sqrt(variance);
270  } else {
271  average_ = 0.0;
272  rms_ = 0.0;
273  }
274 
275  gui_->monInfoSpace()->unlock();
276  xdata::getInfoSpaceFactory()->unlock();
277 
278  ::sleep(monSleepSec_.value_);
279 
280  return true;
281 }
282 
283 //______________________________________________________________________________
285  try {
286  wlWatching_ = toolbox::task::getWorkLoopFactory()->getWorkLoop(
287  sourceId_ + "Watching", "waiting");
288  if (!wlWatching_->isActive())
289  wlWatching_->activate();
290  asWatching_ = toolbox::task::bind(this, &SharedResources::watching,
291  sourceId_ + "Watching");
292  wlWatching_->submit(asWatching_);
293  } catch (xcept::Exception& e) {
294  string msg = "Failed to start workloop 'Watching'.";
295  XCEPT_RETHROW(evf::Exception, msg, e);
296  }
297 }
298 
299 //______________________________________________________________________________
300 bool SharedResources::watching(toolbox::task::WorkLoop*) {
301  lock();
302  if (0 == resourceStructure_) {
303  unlock();
304  return false;
305  }
306 
307  vector<pid_t> evt_prcids;
308  vector<UInt_t> evt_numbers;
309  vector<time_t> evt_tstamps;
310  try {
311  evt_prcids = resourceStructure_->cellPrcIds();
312  evt_numbers = resourceStructure_->cellEvtNumbers();
313  evt_tstamps = resourceStructure_->cellTimeStamps();
314  } catch (evf::Exception& e) {
315  goToFailedState(e);
316  }
317 
318  time_t tcurr = time(0);
319  for (UInt_t i = 0; i < evt_tstamps.size(); i++) {
320  pid_t pid = evt_prcids[i];
321  UInt_t evt = evt_numbers[i];
322  time_t tstamp = evt_tstamps[i];
323  if (tstamp == 0)
324  continue;
325  double tdiff = difftime(tcurr, tstamp);
326  if (tdiff > timeOutSec_) {
327  if (processKillerEnabled_) {
328  // UPDATED
329  kill(pid, 9);
331  }
332  LOG4CPLUS_ERROR(
333  log_,
334  "evt " << evt << " under processing for more than "
335  << timeOutSec_ << "sec for process " << pid);
336 
337  }
338  }
339 
340  vector<pid_t> prcids;
341  try {
342  #ifdef linux
343  auto lk = resourceStructure_->lockCrashHandler();
344  #endif
345  prcids = resourceStructure_->clientPrcIds();
346  for (UInt_t i = 0; i < prcids.size(); i++) {
347  pid_t pid = prcids[i];
348  int status = kill(pid, 0);
349  if (status != 0) {
350  LOG4CPLUS_ERROR(
351  log_,
352  "EP prc " << pid
353  << " died, send to error stream if processing.");
356  }
357  }
358 
359  } catch (evf::Exception& e) {
360  goToFailedState(e);
361  }
362 
363  try {
364  if ((resourceStructure_->nbResources() != nbRawCells_.value_)
365  && !shmInconsistent_) {
366  std::ostringstream ost;
367  ost << "Watchdog spotted inconsistency in ResourceTable - nbRaw="
368  << nbRawCells_.value_ << " but nbResources="
369  << resourceStructure_->nbResources() << " and nbFreeSlots="
371  XCEPT_DECLARE(evf::Exception, sentinelException, ost.str());
372  fsm_->getApp()->notifyQualified("error", sentinelException);
373 
374  // XXX shmInconsistent
375  shmInconsistent_ = true;
376  }
377  } catch (evf::Exception& e) {
378  goToFailedState(e);
379  }
380 
381  unlock();
382 
383  ::sleep(watchSleepSec_.value_);
384  return true;
385 }
386 
387 //______________________________________________________________________________
388 double SharedResources::deltaT(const struct timeval *start,
389  const struct timeval *end) {
390  unsigned int sec;
391  unsigned int usec;
392 
393  sec = end->tv_sec - start->tv_sec;
394 
395  if (end->tv_usec > start->tv_usec) {
396  usec = end->tv_usec - start->tv_usec;
397  } else {
398  sec--;
399  usec = 1000000 - ((unsigned int) (start->tv_usec - end->tv_usec));
400  }
401 
402  return ((double) sec) + ((double) usec) / 1000000.0;
403 }
404 
405 // sendData workloop STARTER
406 //______________________________________________________________________________
408  try {
409  LOG4CPLUS_INFO(log_, "Start 'send data' workloop.");
410  wlSendData_ = toolbox::task::getWorkLoopFactory()->getWorkLoop(
411  "SendData", "waiting");
412  if (!wlSendData_->isActive())
413  wlSendData_->activate();
414  asSendData_ = toolbox::task::bind(this, &SharedResources::sendData,
415  "SendData");
416  wlSendData_->submit(asSendData_);
417 
418  } catch (xcept::Exception& e) {
419  string msg = "Failed to start workloop 'SendData'.";
420  XCEPT_RETHROW(evf::Exception, msg, e);
421  }
422 }
423 
424 // sendData workloop DISPATCHING SIGNATURE
425 bool SharedResources::sendData(toolbox::task::WorkLoop*) {
426  int currentStateID = -1;
427  bool reschedule = true;
428 
430  currentStateID = fsm_->getCurrentState().stateID();
432 
433  try {
434  switch (currentStateID) {
436  reschedule = resourceStructure_->sendData();
437  break;
439  reschedule = resourceStructure_->sendData();
440  break;
443  break;
445  // workloop must be exited in this state
446  return false;
447  default:
448  cout << "RBStateMachine: current state: " << currentStateID
449  << " does not support action: >>sendData<<" << endl;
450  ::usleep(50000);
451  reschedule = true;
452  }
453  } catch (evf::Exception& e) {
454  goToFailedState(e);
455  }
456 
457  return reschedule;
458 }
459 
460 // sendDqm workloop STARTER
461 //______________________________________________________________________________
463  try {
464  LOG4CPLUS_INFO(log_, "Start 'send dqm' workloop.");
465  wlSendDqm_ = toolbox::task::getWorkLoopFactory()->getWorkLoop(
466  "SendDqm", "waiting");
467  if (!wlSendDqm_->isActive())
468  wlSendDqm_->activate();
469  asSendDqm_ = toolbox::task::bind(this, &SharedResources::sendDqm,
470  "SendDqm");
471  wlSendDqm_->submit(asSendDqm_);
472 
473  } catch (xcept::Exception& e) {
474  string msg = "Failed to start workloop 'SendDqm'.";
475  XCEPT_RETHROW(evf::Exception, msg, e);
476  }
477 }
478 
479 // sendDqm workloop DISPATCHING SIGNATURE
480 bool SharedResources::sendDqm(toolbox::task::WorkLoop*) {
481  int currentStateID = -1;
482  bool reschedule = true;
483 
485  currentStateID = fsm_->getCurrentState().stateID();
487 
488  try {
489  switch (currentStateID) {
491  reschedule = resourceStructure_->sendDqm();
492  break;
494  reschedule = resourceStructure_->sendDqm();
495  break;
497  reschedule = resourceStructure_->sendDqmWhileHalting();
498  break;
500  // workloop must be exited in this state
501  return false;
502  default:
503  cout << "RBStateMachine: current state: " << currentStateID
504  << " does not support action: >>sendDqm<<" << endl;
505  ::usleep(50000);
506  reschedule = true;
507  }
508  } catch (evf::Exception& e) {
509  goToFailedState(e);
510  }
511 
512  return reschedule;
513 }
514 
515 // discard workloop STARTER
516 //______________________________________________________________________________
518  try {
519  LOG4CPLUS_INFO(log_, "Start 'discard' workloop.");
520  wlDiscard_ = toolbox::task::getWorkLoopFactory()->getWorkLoop(
521  "Discard", "waiting");
522  if (!wlDiscard_->isActive())
523  wlDiscard_->activate();
524  asDiscard_ = toolbox::task::bind(this, &SharedResources::discard,
525  "Discard");
526  wlDiscard_->submit(asDiscard_);
528 
529  } catch (xcept::Exception& e) {
530  string msg = "Failed to start workloop 'Discard'.";
531  XCEPT_RETHROW(evf::Exception, msg, e);
532  }
534 }
535 
536 // discard workloop DISPATCHING SIGNATURE
537 bool SharedResources::discard(toolbox::task::WorkLoop*) {
538  int currentStateID = -1;
539  bool reschedule = true;
540 
542  currentStateID = fsm_->getCurrentState().stateID();
544  try {
545  switch (currentStateID) {
547  reschedule = resourceStructure_->discard();
548  break;
550  // XXX: communication with BU after stop!
551  reschedule = resourceStructure_->discardWhileHalting(true);
552  break;
554  // XXX: no more communication with BU after halt!
555  reschedule = resourceStructure_->discardWhileHalting(false);
556  break;
558  // workloop must be exited in this state
559  return false;
560  default:
561  cout << "RBStateMachine: current state: " << currentStateID
562  << " does not support action: >>discard<<" << endl;
563  ::usleep(50000);
564  reschedule = true;
565  }
566  } catch (evf::Exception& e) {
567  goToFailedState(e);
568  }
569 
570  return reschedule;
571 }
572 
573 //______________________________________________________________________________
575  cout << "Workloop status===============" << endl;
576  cout << "==============================" << endl;
577  if (wlSendData_ != 0)
578  cout << "SendData -> " << wlSendData_->isActive() << endl;
579  if (wlSendDqm_ != 0)
580  cout << "SendDqm -> " << wlSendDqm_->isActive() << endl;
581  if (wlDiscard_ != 0)
582  cout << "Discard -> " << wlDiscard_->isActive() << endl;
583  //cout << "Workloops Active -> " << isActive_ << endl;
584 }
585 
586 //______________________________________________________________________________
588  reasonForFailed_ = exception.what();
589  LOG4CPLUS_FATAL(log_,
590  "Moving to FAILED state! Reason: " << exception.what());
591  EventPtr fail(new Fail());
592  commands_.enqEvent(fail);
593 }
virtual bool sendDqmWhileHalting()=0
static const char runNumber_[]
virtual std::vector< pid_t > clientPrcIds() const =0
int i
Definition: DBlmapReader.cc:9
double deltaT(const struct timeval *start, const struct timeval *end)
virtual int stateID() const =0
xdata::UnsignedInteger32 recoCellSize_
bool discard(toolbox::task::WorkLoop *wl)
virtual std::vector< time_t > cellTimeStamps() const =0
void setDoCrcCheck(UInt_t doCrcCheck)
Definition: IPCMethod.h:161
xdata::UnsignedInteger32 nbRawCells_
void setActive(bool activeValue)
Definition: IPCMethod.h:179
virtual std::vector< UInt_t > cellEvtNumbers() const =0
reject
Definition: HLTenums.h:23
virtual bool discardWhileHalting(bool sendDiscards)=0
boost::shared_ptr< boost::statechart::event_base > EventPtr
Definition: CommandQueue.h:23
xdata::UnsignedInteger32 resourceStructureTimeout_
static void useEvmBoard(bool useEvmBoard)
Definition: FUResource.h:101
void sleep(Duration_t)
Definition: Utils.h:163
xdata::UnsignedInteger32 nbTimeoutsWithEvent_
virtual bool discard()=0
xdata::UnsignedInteger32 doDumpEvents_
bool sendData(toolbox::task::WorkLoop *wl)
virtual bool sendDqm()=0
xdata::UnsignedInteger32 nbDqmCells_
xdata::UnsignedInteger32 monSleepSec_
bool sendDqm(toolbox::task::WorkLoop *wl)
bool watching(toolbox::task::WorkLoop *wl)
UInt_t sumOfSizes() const
Definition: IPCMethod.h:259
xdata::UnsignedInteger32 deltaSumOfSizes_
T sqrt(T t)
Definition: SSEVec.h:46
#define end
Definition: vmac.h:38
void setDoDumpEvents(UInt_t doDumpEvents)
Definition: IPCMethod.h:168
virtual bool handleCrashedEP(UInt_t runNumber, pid_t pid)=0
uint64_t sumOfSquares() const
Definition: IPCMethod.h:256
xdata::UnsignedInteger32 watchSleepSec_
UInt_t nbFreeSlots() const
Definition: IPCMethod.h:204
IPCMethod *const ipc()
Definition: IPCManager.cc:59
unsigned int UInt_t
Definition: FUTypes.h:12
unsigned long long uint64_t
Definition: Time.h:15
xdata::UnsignedInteger32 nbRecoCells_
xdata::UnsignedInteger32 runNumber_
bool monitoring(toolbox::task::WorkLoop *wl)
virtual UInt_t nbResources() const =0
void initialise(bool segmentationMode, UInt_t nbRawCells, UInt_t nbRecoCells, UInt_t nbDqmCells, UInt_t rawCellSize, UInt_t recoCellSize, UInt_t dqmCellSize, int freeResRequiredForAllocate, BUProxy *bu, SMProxy *sm, log4cplus::Logger logger, unsigned int resourceStructureTimeout, EvffedFillerRB *frb, xdaq::Application *)
Definition: IPCManager.cc:29
xdata::InfoSpace * monInfoSpace()
xdaq::Application * getApp() const
xdata::UnsignedInteger32 nbTimeoutsWithoutEvent_
BaseState const & getCurrentState() const
virtual bool sendData()=0
xdata::UnsignedInteger32 doCrcCheck_
static void doFedIdCheck(bool doFedIdCheck)
Definition: FUResource.h:97
tuple cout
Definition: gather_cfg.py:121
UInt_t nbSent() const
Definition: IPCMethod.h:216
virtual std::vector< pid_t > cellPrcIds() const =0
tuple status
Definition: ntuplemaker.py:245
xdata::UnsignedInteger32 rawCellSize_
xdata::UnsignedInteger32 dqmCellSize_
void goToFailedState(evf::Exception &e)
void setReadyToShutDown(bool readyValue)
Definition: IPCMethod.h:190
virtual bool sendDataWhileHalting()=0
void configureResources(xdaq::Application *app)
xdata::UnsignedInteger32 timeOutSec_