CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
FUResourceBroker.cc
Go to the documentation of this file.
1 //
3 // FUResourceBroker
4 // ----------------
5 //
6 // 10/20/2006 Philipp Schieferdecker <philipp.schieferdecker@cern.ch>
8 
9 
11 
15 
17 
18 #include "EvffedFillerRB.h"
19 
20 #include "i2o/Method.h"
21 #include "interface/shared/i2oXFunctionCodes.h"
22 #include "interface/evb/i2oEVBMsgs.h"
23 #include "xcept/tools.h"
24 
25 #include "toolbox/mem/HeapAllocator.h"
26 #include "toolbox/mem/Reference.h"
27 #include "toolbox/mem/MemoryPoolFactory.h"
28 #include "toolbox/mem/exception/Exception.h"
29 
30 #include "xoap/MessageReference.h"
31 #include "xoap/MessageFactory.h"
32 #include "xoap/SOAPEnvelope.h"
33 #include "xoap/SOAPBody.h"
34 #include "xoap/domutils.h"
35 #include "xoap/Method.h"
36 
37 #include "cgicc/CgiDefs.h"
38 #include "cgicc/Cgicc.h"
39 #include "cgicc/FormEntry.h"
40 #include "cgicc/HTMLClasses.h"
41 
42 #include <signal.h>
43 #include <iostream>
44 #include <sstream>
45 
46 
47 using namespace std;
48 using namespace evf;
49 
50 
52 // construction/destruction
54 
55 //______________________________________________________________________________
56 FUResourceBroker::FUResourceBroker(xdaq::ApplicationStub *s)
57  : xdaq::Application(s)
58  , fsm_(this)
59  , gui_(0)
60  , log_(getApplicationLogger())
61  , bu_(0)
62  , sm_(0)
63  , i2oPool_(0)
64  , resourceTable_(0)
65  , wlMonitoring_(0)
66  , asMonitoring_(0)
67  , wlWatching_(0)
68  , asWatching_(0)
69  , instance_(0)
70  , runNumber_(0)
71  , deltaT_(0.0)
72  , deltaN_(0)
73  , deltaSumOfSquares_(0)
74  , deltaSumOfSizes_(0)
75  , throughput_(0.0)
76  , rate_(0.0)
77  , average_(0.0)
78  , rms_(0.0)
79  , nbAllocatedEvents_(0)
80  , nbPendingRequests_(0)
81  , nbReceivedEvents_(0)
82  , nbSentEvents_(0)
83  , nbSentDqmEvents_(0)
84  , nbSentErrorEvents_(0)
85  , nbPendingSMDiscards_(0)
86  , nbPendingSMDqmDiscards_(0)
87  , nbDiscardedEvents_(0)
88  , nbReceivedEol_(0)
89  , highestEolReceived_(0)
90  , nbEolPosted_(0)
91  , nbEolDiscarded_(0)
92  , nbLostEvents_(0)
93  , nbDataErrors_(0)
94  , nbCrcErrors_(0)
95  , nbTimeoutsWithEvent_(0)
96  , nbTimeoutsWithoutEvent_(0)
97  , dataErrorFlag_(0)
98  , segmentationMode_(false)
99  , nbClients_(0)
100  , clientPrcIds_("")
101  , nbRawCells_(16)
102  , nbRecoCells_(8)
103  , nbDqmCells_(8)
104  , rawCellSize_(0x400000) // 4MB
105  , recoCellSize_(0x800000) // 8MB
106  , dqmCellSize_(0x800000) // 8MB
107  , doDropEvents_(false)
108  , doFedIdCheck_(true)
109  , doCrcCheck_(1)
110  , doDumpEvents_(0)
111  , buClassName_("BU")
112  , buInstance_(0)
113  , smClassName_("StorageManager")
114  , smInstance_(0)
115  , shmResourceTableTimeout_(200000)
116  , monSleepSec_(2)
117  , watchSleepSec_(10)
118  , timeOutSec_(30)
119  , processKillerEnabled_(true)
120  , useEvmBoard_(true)
121  , reasonForFailed_("")
122  , nbAllocateSent_(0)
123  , nbTakeReceived_(0)
124  , nbDataDiscardReceived_(0)
125  , nbDqmDiscardReceived_(0)
126  , nbSentLast_(0)
127  , sumOfSquaresLast_(0)
128  , sumOfSizesLast_(0)
129  , lock_(toolbox::BSem::FULL)
130  , frb_(0)
131  , shmInconsistent_(false)
132 {
133  // setup finite state machine (binding relevant callbacks)
135 
136  // set url, class, instance, and sourceId (=class_instance)
137  url_ =
138  getApplicationDescriptor()->getContextDescriptor()->getURL()+"/"+
139  getApplicationDescriptor()->getURN();
140  class_ =getApplicationDescriptor()->getClassName();
141  instance_=getApplicationDescriptor()->getInstance();
142  sourceId_=class_.toString()+"_"+instance_.toString();
143 
144  // bind i2o callbacks
146  I2O_FU_TAKE,XDAQ_ORGANIZATION_ID);
148  I2O_FU_DATA_DISCARD,XDAQ_ORGANIZATION_ID);
150  I2O_FU_DQM_DISCARD,XDAQ_ORGANIZATION_ID);
152  I2O_EVM_LUMISECTION,XDAQ_ORGANIZATION_ID);
153 
154 
155  // bind HyperDAQ web pages
156  xgi::bind(this,&evf::FUResourceBroker::webPageRequest,"Default");
157  gui_=new WebGUI(this,&fsm_);
158  vector<toolbox::lang::Method*> methods=gui_->getMethods();
159  vector<toolbox::lang::Method*>::iterator it;
160  for (it=methods.begin();it!=methods.end();++it) {
161  if ((*it)->type()=="cgi") {
162  string name=static_cast<xgi::MethodSignature*>(*it)->name();
163  xgi::bind(this,&evf::FUResourceBroker::webPageRequest,name);
164  }
165  }
166  xgi::bind(this,&evf::FUResourceBroker::customWebPage,"customWebPage");
167 
168 
169  // allocate i2o memery pool
170  string i2oPoolName=sourceId_+"_i2oPool";
171  try {
172  toolbox::mem::HeapAllocator *allocator=new toolbox::mem::HeapAllocator();
173  toolbox::net::URN urn("toolbox-mem-pool",i2oPoolName);
174  toolbox::mem::MemoryPoolFactory* poolFactory=
175  toolbox::mem::getMemoryPoolFactory();
176  i2oPool_=poolFactory->createPool(urn,allocator);
177  }
179  string s="Failed to create pool: "+i2oPoolName;
180  LOG4CPLUS_FATAL(log_,s);
181  XCEPT_RETHROW(xcept::Exception,s,e);
182  }
183 
184  // publish all parameters to app info space
186 
187  // findRcmsStateListener
189 
190  // set application icon for hyperdaq
191  getApplicationDescriptor()->setAttribute("icon", "/evf/images/rbicon.jpg");
192  //FUResource::useEvmBoard_ = useEvmBoard_;
193 }
194 
195 
196 //______________________________________________________________________________
198 {
199 
200 }
201 
202 
203 
205 // implementation of member functions
207 
208 //______________________________________________________________________________
209 bool FUResourceBroker::configuring(toolbox::task::WorkLoop* wl)
210 {
211  try {
212  LOG4CPLUS_INFO(log_, "Start configuring ...");
214  frb_ = new EvffedFillerRB(this);
216  if(shmInconsistent_){
217  std::ostringstream ost;
218  ost << "configuring FAILED: Inconsistency in ResourceTable - nbRaw="
219  << nbRawCells_.value_ << " but nbResources=" << resourceTable_->nbResources()
220  << " and nbFreeSlots=" << resourceTable_->nbFreeSlots();
221  XCEPT_RAISE(evf::Exception,ost.str());
222  }
223  LOG4CPLUS_INFO(log_, "Finished configuring!");
224  fsm_.fireEvent("ConfigureDone",this);
225 
226  }
227  catch (xcept::Exception &e) {
228  std::string msg = "configuring FAILED: " + (string)e.what();
229  reasonForFailed_ = e.what();
230  fsm_.fireFailed(msg,this);
231  }
232 
233  return false;
234 }
235 
236 
237 //______________________________________________________________________________
238 bool FUResourceBroker::enabling(toolbox::task::WorkLoop* wl)
239 {
240  try {
241  LOG4CPLUS_INFO(log_, "Start enabling ...");
242  reset();
246  lock();
248  unlock();
253 
256  dataErrorFlag_ = 0;
257 
258  LOG4CPLUS_INFO(log_, "Finished enabling!");
259  fsm_.fireEvent("EnableDone",this);
260  }
261  catch (xcept::Exception &e) {
262  std::string msg = "enabling FAILED: "+xcept::stdformat_exception_history(e);
263  reasonForFailed_ = e.what();
264  fsm_.fireFailed(msg,this);
265  }
266 
267  return false;
268 }
269 
270 
271 //______________________________________________________________________________
272 bool FUResourceBroker::stopping(toolbox::task::WorkLoop* wl)
273 {
274  try {
275  LOG4CPLUS_INFO(log_, "Start stopping :) ...");
276  resourceTable_->stop();
277  timeval now;
278  timeval then;
279  gettimeofday(&then,0);
280  while (!resourceTable_->isReadyToShutDown()) {
281  ::usleep(shmResourceTableTimeout_.value_*10);
282  gettimeofday(&now,0);
283  if ((unsigned int)(now.tv_sec-then.tv_sec) > shmResourceTableTimeout_.value_/10000) {
284  std::cout << "times: " << now.tv_sec << " " << then.tv_sec << " "
285  << shmResourceTableTimeout_.value_/10000 << std::endl;
286  LOG4CPLUS_WARN(log_, "Some Process did not detach - going to Emergency stop!");
287  emergencyStop();
288  break;
289  }
290  }
291 
293  LOG4CPLUS_INFO(log_, "Finished stopping!");
294  fsm_.fireEvent("StopDone",this);
295  }
296  }
297  catch (xcept::Exception &e) {
298  std::string msg = "stopping FAILED: "+xcept::stdformat_exception_history(e);
299  reasonForFailed_ = e.what();
300  fsm_.fireFailed(msg,this);
301  }
302 
303  return false;
304 }
305 
306 
307 //______________________________________________________________________________
308 bool FUResourceBroker::halting(toolbox::task::WorkLoop* wl)
309 {
310  try {
311  LOG4CPLUS_INFO(log_, "Start halting ...");
312  if (resourceTable_->isActive()) {
313  resourceTable_->halt();
314  UInt_t count = 0;
315  while (count<10) {
317  lock();
318  delete resourceTable_;
319  resourceTable_=0;
320  unlock();
321  LOG4CPLUS_INFO(log_,count+1<<". try to destroy resource table succeeded!");
322  break;
323  }
324  else {
325  count++;
326  LOG4CPLUS_DEBUG(log_,count<<". try to destroy resource table failed ...");
327  ::sleep(1);
328  }
329  }
330  }
331  else {
332  lock();
333  delete resourceTable_;
334  resourceTable_=0;
335  unlock();
336  }
337 
338  if (0==resourceTable_) {
339  LOG4CPLUS_INFO(log_,"Finished halting!");
340  fsm_.fireEvent("HaltDone",this);
341  }
342  else {
343  std::string msg = "halting FAILED: ResourceTable shutdown timed out.";
344  reasonForFailed_ = "RESOURCETABLE SHUTDOWN TIMED OUT";
345  fsm_.fireFailed(msg,this);
346  }
347  }
348  catch (xcept::Exception &e) {
349  std::string msg = "halting FAILED: "+xcept::stdformat_exception_history(e);
350  reasonForFailed_ = e.what();
351  fsm_.fireFailed(msg,this);
352  }
353  if(frb_) delete frb_;
354  return false;
355 }
356 
357 
358 //______________________________________________________________________________
359 xoap::MessageReference FUResourceBroker::fsmCallback(xoap::MessageReference msg)
361 {
362  return fsm_.commandCallback(msg);
363 }
364 
365 
366 //______________________________________________________________________________
367 void FUResourceBroker::I2O_FU_TAKE_Callback(toolbox::mem::Reference* bufRef)
368 {
369  nbTakeReceived_.value_++;
370  if(fsm_.checkIfEnabled()){
371  bool eventComplete=resourceTable_->buildResource(bufRef);
372  if (eventComplete&&doDropEvents_) resourceTable_->dropEvent();
373  }
374  else{
375  LOG4CPLUS_ERROR(log_,"TAKE i2o frame received in state "
376  << fsm_.stateName() << " is being lost");
377  bufRef->release();
378  }
379 }
380 
381 //______________________________________________________________________________
382 void FUResourceBroker::I2O_EVM_LUMISECTION_Callback(toolbox::mem::Reference* bufRef)
383 {
384 
385  I2O_EVM_END_OF_LUMISECTION_MESSAGE_FRAME *msg =
386  (I2O_EVM_END_OF_LUMISECTION_MESSAGE_FRAME *)bufRef->getDataLocation();
387  if(msg->lumiSection==0){
388  LOG4CPLUS_ERROR(log_,"EOL message received for ls=0!!! ");
389  fsm_.fireFailed("EOL message received for ls=0!!! ",this);
390  }
391  nbReceivedEol_++;
392  if(highestEolReceived_.value_+100 < msg->lumiSection)
393  {
394  LOG4CPLUS_ERROR(log_,"EOL message not in sequence, expected "
395  << highestEolReceived_.value_+1
396  << " received " << msg->lumiSection);
397  fsm_.fireFailed("EOL message with corrupted LS ",this);
398  }
399  if(highestEolReceived_.value_+1 != msg->lumiSection)
400  LOG4CPLUS_WARN(log_,"EOL message not in sequence, expected "
401  << highestEolReceived_.value_+1
402  << " received " << msg->lumiSection);
403 
404  if(highestEolReceived_.value_ < msg->lumiSection)
405  highestEolReceived_.value_ = msg->lumiSection;
406  resourceTable_->postEndOfLumiSection(bufRef); // this method dummy for now
407 // I2O_EVM_END_OF_LUMISECTION_MESSAGE_FRAME *msg =
408 // (I2O_EVM_END_OF_LUMISECTION_MESSAGE_FRAME *)bufRef->getDataLocation();
409 
410 // LOG4CPLUS_WARN(log_, "Received END-OF-LS from EVM for LS " << msg->lumiSection);
411 
412 }
413 
414 
415 
416 
417 //______________________________________________________________________________
418 void FUResourceBroker::I2O_FU_DATA_DISCARD_Callback(toolbox::mem::Reference* bufRef)
419 {
420  nbDataDiscardReceived_.value_++;
422 }
423 
424 
425 //______________________________________________________________________________
426 void FUResourceBroker::I2O_FU_DQM_DISCARD_Callback(toolbox::mem::Reference* bufRef)
427 {
428  nbDqmDiscardReceived_.value_++;
430 }
431 
432 
433 //______________________________________________________________________________
435 {
436  typedef set<xdaq::ApplicationDescriptor*> AppDescSet_t;
437  typedef AppDescSet_t::iterator AppDescIter_t;
438 
439  // locate input BU
440  AppDescSet_t setOfBUs=
441  getApplicationContext()->getDefaultZone()->
442  getApplicationDescriptors(buClassName_.toString());
443 
444  if (0!=bu_) { delete bu_; bu_=0; }
445 
446  for (AppDescIter_t it=setOfBUs.begin();it!=setOfBUs.end();++it)
447  if ((*it)->getInstance()==buInstance_)
448  bu_=new BUProxy(getApplicationDescriptor(),*it,
449  getApplicationContext(),i2oPool_);
450 
451  if (0==bu_) {
452  string msg=sourceId_+" failed to locate input BU!";
453  XCEPT_RAISE(evf::Exception,msg);
454  }
455 
456  // locate output SM
457  AppDescSet_t setOfSMs=
458  getApplicationContext()->getDefaultZone()->
459  getApplicationDescriptors(smClassName_.toString());
460 
461  if (0!=sm_) { delete sm_; sm_=0; }
462 
463  for (AppDescIter_t it=setOfSMs.begin();it!=setOfSMs.end();++it)
464  if ((*it)->getInstance()==smInstance_)
465  sm_=new SMProxy(getApplicationDescriptor(),*it,
466  getApplicationContext(),i2oPool_);
467 
468  if (0==sm_) LOG4CPLUS_WARN(log_,sourceId_<<" failed to locate output SM!");
469 }
470 
471 
472 //______________________________________________________________________________
475 {
476  string name=in->getenv("PATH_INFO");
477  if (name.empty()) name="defaultWebPage";
478  static_cast<xgi::MethodSignature*>(gui_->getMethod(name))->invoke(in,out);
479 }
480 
481 
482 //______________________________________________________________________________
484 {
485  lock();
486 
487  if (0!=resourceTable_) {
488 
489  //gui_->monInfoSpace()->lock();
490 
491  if (e.type()=="urn:xdata-event:ItemGroupRetrieveEvent") {
509  dataErrorFlag_.value_ = (nbCrcErrors_.value_ != 0u +
510  ((nbDataErrors_.value_ != 0u) << 1) +
511  ((nbLostEvents_.value_ != 0u) << 2) +
512  ((nbTimeoutsWithEvent_.value_ != 0u) << 3) +
513  ((nbTimeoutsWithoutEvent_.value_ != 0u) << 4) +
514  ((nbSentErrorEvents_.value_ != 0u) << 5)
515  );
516  }
517  else if (e.type()=="ItemChangedEvent") {
518 
519  string item=dynamic_cast<xdata::ItemChangedEvent&>(e).itemName();
520 
521  if (item=="doFedIdCheck") FUResource::doFedIdCheck(doFedIdCheck_);
522  if (item=="useEvmBoard") FUResource::useEvmBoard(useEvmBoard_);
523  if (item=="doCrcCheck") resourceTable_->setDoCrcCheck(doCrcCheck_);
524  if (item=="doDumpEvents") resourceTable_->setDoDumpEvents(doDumpEvents_);
525  }
526 
527  //gui_->monInfoSpace()->unlock();
528  }
529  else {
530  nbClients_ =0;
531  clientPrcIds_ ="";
535  nbSentEvents_ =0;
536  nbSentDqmEvents_ =0;
541  nbLostEvents_ =0;
542  nbDataErrors_ =0;
543  nbCrcErrors_ =0;
544  nbAllocateSent_ =0;
545  }
546  unlock();
547 }
548 
549 
550 //______________________________________________________________________________
552 {
553  struct timezone timezone;
554  gettimeofday(&monStartTime_,&timezone);
555 
556  try {
558  toolbox::task::getWorkLoopFactory()->getWorkLoop(sourceId_+"Monitoring",
559  "waiting");
560  if (!wlMonitoring_->isActive()) wlMonitoring_->activate();
561  asMonitoring_=toolbox::task::bind(this,&FUResourceBroker::monitoring,
562  sourceId_+"Monitoring");
563  wlMonitoring_->submit(asMonitoring_);
564  }
565  catch (xcept::Exception& e) {
566  string msg = "Failed to start workloop 'Monitoring'.";
567  XCEPT_RETHROW(evf::Exception,msg,e);
568  }
569 }
570 
571 
572 //______________________________________________________________________________
573 bool FUResourceBroker::monitoring(toolbox::task::WorkLoop* wl)
574 {
575  unsigned int nbSent;
576  uint64_t sumOfSquares;
577  unsigned int sumOfSizes;
578  uint64_t deltaSumOfSquares;
579 
580  lock();
581  if (0==resourceTable_) {
582  deltaT_.value_ =0.0;
583  deltaN_.value_ = 0;
584  deltaSumOfSquares_.value_=0.0;
585  deltaSumOfSizes_.value_ = 0;
586  throughput_ =0.0;
587  rate_ =0.0;
588  average_ =0.0;
589  rms_ =0.0;
590  unlock();
591  return false;
592  }
593  else {
594  nbSent =resourceTable_->nbSent();
595  sumOfSquares=resourceTable_->sumOfSquares();
596  sumOfSizes =resourceTable_->sumOfSizes();
597  }
598  unlock();
599 
600  struct timeval monEndTime;
601  struct timezone timezone;
602 
603  gettimeofday(&monEndTime,&timezone);
604 
605  xdata::getInfoSpaceFactory()->lock();
606  gui_->monInfoSpace()->lock();
607 
608  deltaT_.value_=deltaT(&monStartTime_,&monEndTime);
609  monStartTime_=monEndTime;
610 
611  deltaN_.value_=nbSent-nbSentLast_;
612  nbSentLast_=nbSent;
613 
614  deltaSumOfSquares=sumOfSquares-sumOfSquaresLast_;
615  deltaSumOfSquares_.value_=(double)deltaSumOfSquares;
616  sumOfSquaresLast_=sumOfSquares;
617 
618  deltaSumOfSizes_.value_=sumOfSizes-sumOfSizesLast_;
619  sumOfSizesLast_=sumOfSizes;
620 
621  if (deltaT_.value_!=0) {
622  throughput_=deltaSumOfSizes_.value_/deltaT_.value_;
623  rate_ =deltaN_.value_/deltaT_.value_;
624  }
625  else {
626  throughput_=0.0;
627  rate_ =0.0;
628  }
629 
630  double meanOfSquares,mean,squareOfMean,variance;
631 
632  if(deltaN_.value_!=0) {
633  meanOfSquares=deltaSumOfSquares_.value_/((double)(deltaN_.value_));
634  mean=((double)(deltaSumOfSizes_.value_))/((double)(deltaN_.value_));
635  squareOfMean=mean*mean;
636  variance=meanOfSquares-squareOfMean; if(variance<0.0) variance=0.0;
637 
638  average_=deltaSumOfSizes_.value_/deltaN_.value_;
639  rms_ =std::sqrt(variance);
640  }
641  else {
642  average_=0.0;
643  rms_ =0.0;
644  }
645 
646  gui_->monInfoSpace()->unlock();
647  xdata::getInfoSpaceFactory()->unlock();
648 
649  ::sleep(monSleepSec_.value_);
650 
651  return true;
652 }
653 
654 
655 //______________________________________________________________________________
657 {
658  try {
659  wlWatching_=
660  toolbox::task::getWorkLoopFactory()->getWorkLoop(sourceId_+"Watching",
661  "waiting");
662  if (!wlWatching_->isActive()) wlWatching_->activate();
663  asWatching_=toolbox::task::bind(this,&FUResourceBroker::watching,
664  sourceId_+"Watching");
665  wlWatching_->submit(asWatching_);
666  }
667  catch (xcept::Exception& e) {
668  string msg = "Failed to start workloop 'Watching'.";
669  XCEPT_RETHROW(evf::Exception,msg,e);
670  }
671 }
672 
673 
674 //______________________________________________________________________________
675 bool FUResourceBroker::watching(toolbox::task::WorkLoop* wl)
676 {
677  lock();
678 
679  if (0==resourceTable_) {
680  unlock();
681  return false;
682  }
683 
684  vector<pid_t> evt_prcids =resourceTable_->cellPrcIds();
685  vector<UInt_t> evt_numbers=resourceTable_->cellEvtNumbers();
686  vector<time_t> evt_tstamps=resourceTable_->cellTimeStamps();
687 
688  time_t tcurr=time(0);
689  for (UInt_t i=0;i<evt_tstamps.size();i++) {
690  pid_t pid =evt_prcids[i];
691  UInt_t evt =evt_numbers[i];
692  time_t tstamp=evt_tstamps[i]; if (tstamp==0) continue;
693  double tdiff =difftime(tcurr,tstamp);
694  if (tdiff>timeOutSec_) {
696  LOG4CPLUS_ERROR(log_,"evt "<<evt<<" timed out, "<<"kill prc "<<pid);
697  kill(pid,9);
699  }
700  else {
701  LOG4CPLUS_INFO(log_,"evt "<<evt<<" under processing for more than "
702  <<timeOutSec_<<"sec for process "<<pid);
703  }
704  }
705  }
706 
707  vector<pid_t> prcids=resourceTable_->clientPrcIds();
708  for (UInt_t i=0;i<prcids.size();i++) {
709  pid_t pid =prcids[i];
710  int status=kill(pid,0);
711  if (status!=0) {
712  LOG4CPLUS_ERROR(log_,"EP prc "<<pid<<" died, send to error stream if processing.");
715  }
716  }
717 
719  std::ostringstream ost;
720  ost << "Watchdog spotted inconsistency in ResourceTable - nbRaw="
721  << nbRawCells_.value_ << " but nbResources=" << resourceTable_->nbResources()
722  << " and nbFreeSlots=" << resourceTable_->nbFreeSlots();
723  XCEPT_DECLARE(evf::Exception,
724  sentinelException, ost.str());
725  notifyQualified("error",sentinelException);
726  shmInconsistent_ = true;
727  }
728 
729  unlock();
730 
731  ::sleep(watchSleepSec_.value_);
732  return true;
733 }
734 
735 
736 //______________________________________________________________________________
738 {
739  assert(0!=gui_);
740 
741  gui_->addMonitorParam("url", &url_);
742  gui_->addMonitorParam("class", &class_);
743  gui_->addMonitorParam("instance", &instance_);
744  gui_->addMonitorParam("runNumber", &runNumber_);
745  gui_->addMonitorParam("stateName", fsm_.stateName());
746 
747  gui_->addMonitorParam("deltaT", &deltaT_);
748  gui_->addMonitorParam("deltaN", &deltaN_);
749  gui_->addMonitorParam("deltaSumOfSquares", &deltaSumOfSquares_);
750  gui_->addMonitorParam("deltaSumOfSizes", &deltaSumOfSizes_);
751 
752  gui_->addMonitorParam("throughput", &throughput_);
753  gui_->addMonitorParam("rate", &rate_);
754  gui_->addMonitorParam("average", &average_);
755  gui_->addMonitorParam("rms", &rms_);
756  gui_->addMonitorParam("dataErrorFlag", &dataErrorFlag_);
757 
758  gui_->addMonitorCounter("nbAllocatedEvents", &nbAllocatedEvents_);
759  gui_->addMonitorCounter("nbPendingRequests", &nbPendingRequests_);
760  gui_->addMonitorCounter("nbReceivedEvents", &nbReceivedEvents_);
761  gui_->addMonitorCounter("nbSentEvents", &nbSentEvents_);
762  gui_->addMonitorCounter("nbSentErrorEvents", &nbSentErrorEvents_);
763  gui_->addMonitorCounter("nbDiscardedEvents", &nbDiscardedEvents_);
764  gui_->addMonitorCounter("nbReceivedEol", &nbReceivedEol_);
765  gui_->addMonitorCounter("highestEolReceived", &highestEolReceived_);
766  gui_->addMonitorCounter("nbEolPosted", &nbEolPosted_);
767  gui_->addMonitorCounter("nbEolDiscarded", &nbEolDiscarded_);
768 
769  gui_->addMonitorCounter("nbPendingSMDiscards", &nbPendingSMDiscards_);
770 
771  gui_->addMonitorCounter("nbSentDqmEvents", &nbSentDqmEvents_);
772  gui_->addMonitorCounter("nbDqmDiscardReceived", &nbDqmDiscardReceived_);
773  gui_->addMonitorCounter("nbPendingSMDqmDiscards", &nbPendingSMDqmDiscards_);
774 
775  gui_->addMonitorCounter("nbLostEvents", &nbLostEvents_);
776  gui_->addMonitorCounter("nbDataErrors", &nbDataErrors_);
777  gui_->addMonitorCounter("nbCrcErrors", &nbCrcErrors_);
778  gui_->addMonitorCounter("nbTimeoutsWithEvent", &nbTimeoutsWithEvent_);
779  gui_->addMonitorCounter("nbTimeoutsWithoutEvent", &nbTimeoutsWithoutEvent_);
780 
781  gui_->addStandardParam("segmentationMode", &segmentationMode_);
782  gui_->addStandardParam("nbClients", &nbClients_);
783  gui_->addStandardParam("clientPrcIds", &clientPrcIds_);
784  gui_->addStandardParam("nbRawCells", &nbRawCells_);
785  gui_->addStandardParam("nbRecoCells", &nbRecoCells_);
786  gui_->addStandardParam("nbDqmCells", &nbDqmCells_);
787  gui_->addStandardParam("rawCellSize", &rawCellSize_);
788  gui_->addStandardParam("recoCellSize", &recoCellSize_);
789  gui_->addStandardParam("dqmCellSize", &dqmCellSize_);
790 
791  gui_->addStandardParam("doDropEvents", &doDropEvents_);
792  gui_->addStandardParam("doFedIdCheck", &doFedIdCheck_);
793  gui_->addStandardParam("doCrcCheck", &doCrcCheck_);
794  gui_->addStandardParam("doDumpEvents", &doDumpEvents_);
795  gui_->addStandardParam("buClassName", &buClassName_);
796  gui_->addStandardParam("buInstance", &buInstance_);
797  gui_->addStandardParam("smClassName", &smClassName_);
798  gui_->addStandardParam("smInstance", &smInstance_);
799  gui_->addStandardParam("shmResourceTableTimeout", &shmResourceTableTimeout_);
800  gui_->addStandardParam("monSleepSec", &monSleepSec_);
801  gui_->addStandardParam("watchSleepSec", &watchSleepSec_);
802  gui_->addStandardParam("timeOutSec", &timeOutSec_);
803  gui_->addStandardParam("processKillerEnabled", &processKillerEnabled_);
804  gui_->addStandardParam("useEvmBoard", &useEvmBoard_);
805  gui_->addStandardParam("rcmsStateListener", fsm_.rcmsStateListener());
806  gui_->addStandardParam("foundRcmsStateListener", fsm_.foundRcmsStateListener());
807  gui_->addStandardParam("reasonForFailed", &reasonForFailed_);
808 
809  gui_->addDebugCounter("nbAllocateSent", &nbAllocateSent_);
810  gui_->addDebugCounter("nbTakeReceived", &nbTakeReceived_);
811  gui_->addDebugCounter("nbDataDiscardReceived", &nbDataDiscardReceived_);
812 
814 
815  gui_->addItemChangedListener("doFedIdCheck", this);
816  gui_->addItemChangedListener("useEvmBoard", this);
817  gui_->addItemChangedListener("doCrcCheck", this);
818  gui_->addItemChangedListener("doDumpEvents", this);
819 }
820 
821 
822 //______________________________________________________________________________
824 {
825  gui_->resetCounters();
826 
827  deltaT_ =0.0;
828  deltaN_ = 0;
829  deltaSumOfSquares_=0.0;
830  deltaSumOfSizes_ = 0;
831 
832  throughput_ =0.0;
833  rate_ =0.0;
834  average_ =0.0;
835  rms_ =0.0;
836 
837  nbSentLast_ = 0;
838  sumOfSquaresLast_ = 0;
839  sumOfSizesLast_ = 0;
840 }
841 
842 
843 //______________________________________________________________________________
844 double FUResourceBroker::deltaT(const struct timeval *start,
845  const struct timeval *end)
846 {
847  unsigned int sec;
848  unsigned int usec;
849 
850  sec = end->tv_sec - start->tv_sec;
851 
852  if(end->tv_usec > start->tv_usec) {
853  usec = end->tv_usec - start->tv_usec;
854  }
855  else {
856  sec--;
857  usec = 1000000 - ((unsigned int )(start->tv_usec - end->tv_usec));
858  }
859 
860  return ((double)sec) + ((double)usec) / 1000000.0;
861 }
862 
863 
864 
865 //______________________________________________________________________________
868 {
869  using namespace cgicc;
870  Cgicc cgi(in);
871  std::vector<FormEntry> els = cgi.getElements() ;
872  for(std::vector<FormEntry>::iterator it = els.begin(); it != els.end(); it++)
873  std::cout << "form entry " << (*it).getValue() << std::endl;
874 
875  std::vector<FormEntry> el1;
876  cgi.getElement("crcError",el1);
877  *out<<"<html>"<<endl;
878  gui_->htmlHead(in,out,sourceId_);
879  *out<<"<body>"<<endl;
880  gui_->htmlHeadline(in,out);
881 
882  lock();
883 
884  if (0!=resourceTable_) {
885  if(el1.size()!=0) {
886  resourceTable_->injectCRCError();
887  }
888  *out << "<form method=\"GET\" action=\"customWebPage\" >";
889  *out << "<button name=\"crcError\" type=\"submit\" value=\"injCRC\">Inject CRC</button>" << endl;
890  *out << "</form>" << endl;
891  *out << "<hr/>" << std::endl;
892  vector<pid_t> client_prc_ids = resourceTable_->clientPrcIds();
893  *out<<table().set("frame","void").set("rules","rows")
894  .set("class","modules").set("width","250")<<endl
895  <<tr()<<th("Client Processes").set("colspan","3")<<tr()<<endl
896  <<tr()
897  <<th("client").set("align","left")
898  <<th("process id").set("align","center")
899  <<th("status").set("align","center")
900  <<tr()
901  <<endl;
902  for (UInt_t i=0;i<client_prc_ids.size();i++) {
903 
904  pid_t pid =client_prc_ids[i];
905  int status=kill(pid,0);
906 
907  stringstream ssi; ssi<<i+1;
908  stringstream sspid; sspid<<pid;
909  stringstream ssstatus; ssstatus<<status;
910 
911  string bg_status = (status==0) ? "#00ff00" : "ff0000";
912  *out<<tr()
913  <<td(ssi.str()).set("align","left")
914  <<td(sspid.str()).set("align","center")
915  <<td(ssstatus.str()).set("align","center").set("bgcolor",bg_status)
916  <<tr()<<endl;
917  }
918  *out<<table()<<endl;
919  *out<<"<br><br>"<<endl;
920 
921  vector<string> states = resourceTable_->cellStates();
922  vector<UInt_t> evt_numbers = resourceTable_->cellEvtNumbers();
923  vector<pid_t> prc_ids = resourceTable_->cellPrcIds();
924  vector<time_t> time_stamps = resourceTable_->cellTimeStamps();
925 
926  *out<<table().set("frame","void").set("rules","rows")
927  .set("class","modules").set("width","500")<<endl
928  <<tr()<<th("Shared Memory Cells").set("colspan","6")<<tr()<<endl
929  <<tr()
930  <<th("cell").set("align","left")
931  <<th("state").set("align","center")
932  <<th("event").set("align","center")
933  <<th("process id").set("align","center")
934  <<th("timestamp").set("align","center")
935  <<th("time").set("align","center")
936  <<tr()
937  <<endl;
938  for (UInt_t i=0;i<states.size();i++) {
939  string state=states[i];
940  UInt_t evt = evt_numbers[i];
941  pid_t pid = prc_ids[i];
942  time_t tstamp= time_stamps[i];
943  double tdiff = difftime(time(0),tstamp);
944 
945  stringstream ssi; ssi<<i;
946  stringstream ssevt; if (evt!=0xffffffff) ssevt<<evt; else ssevt<<" - ";
947  stringstream sspid; if (pid!=0) sspid<<pid; else sspid<<" - ";
948  stringstream sststamp; if (tstamp!=0) sststamp<<tstamp; else sststamp<<" - ";
949  stringstream sstdiff; if (tstamp!=0) sstdiff<<tdiff; else sstdiff<<" - ";
950 
951  string bg_state = "#ffffff";
952  if (state=="RAWWRITING"||state=="RAWWRITTEN"||
953  state=="RAWREADING"||state=="RAWREAD")
954  bg_state="#99CCff";
955  else if (state=="PROCESSING")
956  bg_state="#ff0000";
957  else if (state=="PROCESSED"||state=="RECOWRITING"||state=="RECOWRITTEN")
958  bg_state="#CCff99";
959  else if (state=="SENDING")
960  bg_state="#00FF33";
961  else if (state=="SENT")
962  bg_state="#006633";
963  else if (state=="DISCARDING")
964  bg_state="#FFFF00";
965  else if (state=="LUMISECTION")
966  bg_state="#0000FF";
967 
968  *out<<tr()
969  <<td(ssi.str()).set("align","left")
970  <<td(state).set("align","center").set("bgcolor",bg_state)
971  <<td(ssevt.str()).set("align","center")
972  <<td(sspid.str()).set("align","center")
973  <<td(sststamp.str()).set("align","center")
974  <<td(sstdiff.str()).set("align","center")
975  <<tr()<<endl;
976  }
977  *out<<table()<<endl;
978  *out<<"<br><br>"<<endl;
979 
980  vector<string> dqmstates = resourceTable_->dqmCellStates();
981 
982  *out<<table().set("frame","void").set("rules","rows")
983  .set("class","modules").set("width","500")<<endl
984  <<tr()<<th("Shared Memory DQM Cells").set("colspan","6")<<tr()<<endl
985  <<tr()
986  <<th("cell").set("align","left")
987  <<th("state").set("align","center")
988  <<tr()
989  <<endl;
990  for (UInt_t i=0;i<dqmstates.size();i++) {
991  string state=dqmstates[i];
992 
993  string bg_state = "#ffffff";
994  if (state=="WRITING"||state=="WRITTEN")
995  bg_state="#99CCff";
996  else if (state=="SENDING")
997  bg_state="#00FF33";
998  else if (state=="SENT")
999  bg_state="#006633";
1000  else if (state=="DISCARDING")
1001  bg_state="#FFFF00";
1002 
1003  *out<<tr()<<"<td>"<<i<<"</td>"
1004  <<td(state).set("align","center").set("bgcolor",bg_state)
1005  <<tr()<<endl;
1006  }
1007  *out<<table()<<endl;
1008 
1009 
1010 
1011  }
1012  *out<<"</body>"<<endl<<"</html>"<<endl;
1013 
1014  unlock();
1015 }
1016 
1018 {
1019  LOG4CPLUS_WARN(log_, "in Emergency stop - handle non-clean stops");
1020  vector<pid_t> client_prc_ids = resourceTable_->clientPrcIds();
1021  for (UInt_t i=0;i<client_prc_ids.size();i++) {
1022  pid_t pid =client_prc_ids[i];
1023  std::cout << "B: killing process " << i << "pid=" << pid << std::endl;
1024  if(pid!=0){
1025  //assume processes are dead by now
1028  else
1030  }
1031  }
1033  ::sleep(1);
1035  {
1036  reasonForFailed_ = "EmergencyStop: failed to shut down ResourceTable";
1037  XCEPT_RAISE(evf::Exception,reasonForFailed_);
1038  }
1040  lock();
1041  std::cout << "delete resourcetable" <<std::endl;
1042  delete resourceTable_;
1043  resourceTable_=0;
1044  std::cout << "cycle through resourcetable config " << std::endl;
1046  unlock();
1047  if(shmInconsistent_) XCEPT_RAISE(evf::Exception,"Inconsistent shm state");
1048  std::cout << "done with emergency stop" << std::endl;
1049 }
1050 
1052 {
1054  nbRawCells_.value_,
1055  nbRecoCells_.value_,
1056  nbDqmCells_.value_,
1057  rawCellSize_.value_,
1058  recoCellSize_.value_,
1059  dqmCellSize_.value_,
1060  bu_,sm_,
1061  log_,
1062  shmResourceTableTimeout_.value_,
1063  frb_,
1064  this);
1069  reset();
1070  shmInconsistent_ = false;
1071  if(resourceTable_->nbResources() != nbRawCells_.value_ ||
1072  resourceTable_->nbFreeSlots() != nbRawCells_.value_)
1073  shmInconsistent_ = true;
1074 }
1075 
1076 
1078 // XDAQ instantiator implementation macro
1080 
1081 XDAQ_INSTANTIATOR_IMPL(FUResourceBroker)
list table
Definition: asciidump.py:386
static const char runNumber_[]
xdata::String clientPrcIds_
bool discardDataEvent(MemRef_t *bufRef)
xdata::UnsignedInteger32 nbLostEvents_
UInt_t nbFreeSlots() const
void addDebugCounter(CString_t &name, Counter_t *counter)
Definition: WebGUI.cc:189
int i
Definition: DBlmapReader.cc:9
void resetCounters()
Definition: WebGUI.cc:217
std::vector< UInt_t > cellEvtNumbers() const
#define Input(cl)
Definition: vmac.h:189
xdata::UnsignedInteger32 monSleepSec_
xdata::UnsignedInteger32 nbEolPosted_
xdata::UnsignedInteger32 nbDiscardedEvents_
xdata::UnsignedInteger32 nbReceivedEvents_
xdata::UnsignedInteger32 nbPendingSMDqmDiscards_
xdata::UnsignedInteger32 nbPendingRequests_
xdata::UnsignedInteger32 nbSentDqmEvents_
xdata::UnsignedInteger32 nbRawCells_
void addStandardParam(CString_t &name, Param_t *param)
Definition: WebGUI.cc:134
bool handleCrashedEP(UInt_t runNumber, pid_t pid)
void exportParameters()
Definition: WebGUI.cc:200
void I2O_FU_TAKE_Callback(toolbox::mem::Reference *bufRef)
void setDoDumpEvents(UInt_t doDumpEvents)
bool isActive() const
xdata::UnsignedInteger32 nbTimeoutsWithoutEvent_
void customWebPage(xgi::Input *in, xgi::Output *out)
xdata::UnsignedInteger32 nbSentEvents_
xdata::UnsignedInteger32 nbSentErrorEvents_
xdata::UnsignedInteger32 buInstance_
xdata::UnsignedInteger32 nbDataDiscardReceived_
bool watching(toolbox::task::WorkLoop *wl)
UInt_t nbErrors() const
bool buildResource(MemRef_t *bufRef)
tuple els
Definition: asciidump.py:420
void addMonitorParam(CString_t &name, Param_t *param)
Definition: WebGUI.cc:145
void addItemChangedListener(CString_t &name, xdata::ActionListener *l)
Definition: WebGUI.cc:238
xdata::String reasonForFailed_
void I2O_FU_DATA_DISCARD_Callback(toolbox::mem::Reference *bufRef)
#define I2O_FU_DQM_DISCARD
Definition: i2oEvfMsgs.h:27
UInt_t nbLost() const
double deltaT(const struct timeval *start, const struct timeval *end)
static void useEvmBoard(bool useEvmBoard)
Definition: FUResource.h:57
void sleep(Duration_t)
Definition: Utils.h:163
xdata::Boolean segmentationMode_
UInt_t nbClients() const
void I2O_FU_DQM_DISCARD_Callback(toolbox::mem::Reference *bufRef)
xdata::UnsignedInteger32 highestEolReceived_
xdata::UnsignedInteger32 nbDqmDiscardReceived_
void initialize(T *app)
Definition: StateMachine.h:84
xdata::UnsignedInteger32 nbDataErrors_
xdata::UnsignedInteger32 doDumpEvents_
UInt_t nbEolDiscarded() const
xdata::UnsignedInteger32 nbClients_
xdata::Bag< xdaq2rc::ClassnameAndInstance > * rcmsStateListener()
Definition: StateMachine.h:72
void fireFailed(const std::string &errorMsg, void *originator)
bool halting(toolbox::task::WorkLoop *wl)
xdata::UnsignedInteger32 runNumber_
bool configuring(toolbox::task::WorkLoop *wl)
xdata::UnsignedInteger32 smInstance_
UInt_t nbDiscarded() const
uint64_t sumOfSquares() const
bool isReadyToShutDown() const
toolbox::task::WorkLoop * wlMonitoring_
EvffedFillerRB * frb_
UInt_t nbPendingSMDqmDiscards() const
bool stopping(toolbox::task::WorkLoop *wl)
T sqrt(T t)
Definition: SSEVec.h:28
xdata::Boolean doFedIdCheck_
UInt_t sumOfSizes() const
std::vector< time_t > cellTimeStamps() const
std::vector< pid_t > clientPrcIds() const
xdata::UnsignedInteger32 nbTakeReceived_
UInt_t nbAllocated() const
xdata::UnsignedInteger32 timeOutSec_
xdata::UnsignedInteger32 rawCellSize_
xdata::UnsignedInteger32 nbRecoCells_
#define end
Definition: vmac.h:38
UInt_t nbCrcErrors() const
xdata::UnsignedInteger32 nbDqmCells_
UInt_t nbResources() const
void setRunNumber(UInt_t runNumber)
bool enabling(toolbox::task::WorkLoop *wl)
xdata::UnsignedInteger32 nbEolDiscarded_
UInt_t nbSentError() const
void fireEvent(const std::string &evtType, void *originator)
#define I2O_FU_DATA_DISCARD
Definition: i2oEvfMsgs.h:26
tuple out
Definition: dbtoconf.py:99
xdata::UnsignedInteger32 nbAllocatedEvents_
void actionPerformed(xdata::Event &e)
unsigned int UInt_t
Definition: FUTypes.h:12
xdata::UnsignedInteger32 nbTimeoutsWithEvent_
xdata::Boolean processKillerEnabled_
xdata::UnsignedInteger32 nbPendingSMDiscards_
std::string clientPrcIdsAsString() const
FUResourceTable * resourceTable_
xdata::UnsignedInteger32 deltaSumOfSizes_
xdata::Boolean useEvmBoard_
UInt_t nbCompleted() const
xdata::UnsignedInteger32 dqmCellSize_
unsigned long long uint64_t
Definition: Time.h:15
xdata::String * stateName()
Definition: StateMachine.h:69
xdata::InfoSpace * monInfoSpace()
Definition: WebGUI.h:72
xdata::Double deltaSumOfSquares_
UInt_t nbSent() const
UInt_t nbEolPosted() const
xdata::Boolean * foundRcmsStateListener()
Definition: StateMachine.h:78
UInt_t nbPendingSMDiscards() const
char state
Definition: procUtils.cc:75
xoap::MessageReference fsmCallback(xoap::MessageReference msg)
xdata::String smClassName_
UInt_t nbAllocSent() const
std::vector< pid_t > cellPrcIds() const
xdata::UnsignedInteger32 deltaN_
#define Output(cl)
Definition: vmac.h:193
xdata::UnsignedInteger32 recoCellSize_
xdata::String buClassName_
UInt_t nbSentDqm() const
xdata::UnsignedInteger32 watchSleepSec_
xdata::UnsignedInteger32 shmResourceTableTimeout_
static void doFedIdCheck(bool doFedIdCheck)
Definition: FUResource.h:55
void I2O_EVM_LUMISECTION_Callback(toolbox::mem::Reference *bufRef)
tuple cout
Definition: gather_cfg.py:41
void addMonitorCounter(CString_t &name, Counter_t *counter)
Definition: WebGUI.cc:178
bool monitoring(toolbox::task::WorkLoop *wl)
void postEndOfLumiSection(MemRef_t *bufRef)
string s
Definition: asciidump.py:422
toolbox::mem::Pool * i2oPool_
xdata::UnsignedInteger32 nbReceivedEol_
tuple status
Definition: ntuplemaker.py:245
toolbox::task::WorkLoop * wlWatching_
xdata::Boolean doDropEvents_
xdata::UnsignedInteger32 doCrcCheck_
xdata::UnsignedInteger32 instance_
toolbox::task::ActionSignature * asMonitoring_
bool discardDqmEvent(MemRef_t *bufRef)
xdata::UnsignedInteger32 nbCrcErrors_
void webPageRequest(xgi::Input *in, xgi::Output *out)
toolbox::task::ActionSignature * asWatching_
xdata::UnsignedInteger32 nbAllocateSent_
struct timeval monStartTime_
void setDoCrcCheck(UInt_t doCrcCheck)
void findRcmsStateListener()
xdata::UnsignedInteger32 dataErrorFlag_
evf::StateMachine fsm_
UInt_t nbPending() const