CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
FUResourceBroker.cc
Go to the documentation of this file.
1 //
3 // FUResourceBroker
4 // ----------------
5 //
6 // 10/20/2006 Philipp Schieferdecker <philipp.schieferdecker@cern.ch>
8 
9 
11 
15 
17 
18 #include "EvffedFillerRB.h"
19 
20 #include "i2o/Method.h"
21 #include "interface/shared/i2oXFunctionCodes.h"
22 #include "interface/evb/i2oEVBMsgs.h"
23 #include "xcept/tools.h"
24 
25 #include "toolbox/mem/HeapAllocator.h"
26 #include "toolbox/mem/Reference.h"
27 #include "toolbox/mem/MemoryPoolFactory.h"
28 #include "toolbox/mem/exception/Exception.h"
29 
30 #include "xoap/MessageReference.h"
31 #include "xoap/MessageFactory.h"
32 #include "xoap/SOAPEnvelope.h"
33 #include "xoap/SOAPBody.h"
34 #include "xoap/domutils.h"
35 #include "xoap/Method.h"
36 
37 #include "cgicc/CgiDefs.h"
38 #include "cgicc/Cgicc.h"
39 #include "cgicc/FormEntry.h"
40 #include "cgicc/HTMLClasses.h"
41 
42 #include <signal.h>
43 #include <iostream>
44 #include <sstream>
45 
46 
47 using namespace std;
48 using namespace evf;
49 
50 
52 // construction/destruction
54 
55 //______________________________________________________________________________
56 FUResourceBroker::FUResourceBroker(xdaq::ApplicationStub *s)
57  : xdaq::Application(s)
58  , fsm_(this)
59  , gui_(0)
60  , log_(getApplicationLogger())
61  , bu_(0)
62  , sm_(0)
63  , i2oPool_(0)
64  , resourceTable_(0)
65  , wlMonitoring_(0)
66  , asMonitoring_(0)
67  , wlWatching_(0)
68  , asWatching_(0)
69  , instance_(0)
70  , runNumber_(0)
71  , deltaT_(0.0)
72  , deltaN_(0)
73  , deltaSumOfSquares_(0)
74  , deltaSumOfSizes_(0)
75  , throughput_(0.0)
76  , rate_(0.0)
77  , average_(0.0)
78  , rms_(0.0)
79  , nbAllocatedEvents_(0)
80  , nbPendingRequests_(0)
81  , nbReceivedEvents_(0)
82  , nbSentEvents_(0)
83  , nbSentDqmEvents_(0)
84  , nbSentErrorEvents_(0)
85  , nbPendingSMDiscards_(0)
86  , nbPendingSMDqmDiscards_(0)
87  , nbDiscardedEvents_(0)
88  , nbLostEvents_(0)
89  , nbDataErrors_(0)
90  , nbCrcErrors_(0)
91  , nbTimeoutsWithEvent_(0)
92  , nbTimeoutsWithoutEvent_(0)
93  , dataErrorFlag_(0)
94  , segmentationMode_(false)
95  , nbClients_(0)
96  , clientPrcIds_("")
97  , nbRawCells_(16)
98  , nbRecoCells_(8)
99  , nbDqmCells_(8)
100  , rawCellSize_(0x400000) // 4MB
101  , recoCellSize_(0x800000) // 8MB
102  , dqmCellSize_(0x800000) // 8MB
103  , doDropEvents_(false)
104  , doFedIdCheck_(true)
105  , doCrcCheck_(1)
106  , doDumpEvents_(0)
107  , buClassName_("BU")
108  , buInstance_(0)
109  , smClassName_("StorageManager")
110  , smInstance_(0)
111  , shmResourceTableTimeout_(200000)
112  , monSleepSec_(2)
113  , watchSleepSec_(10)
114  , timeOutSec_(30)
115  , processKillerEnabled_(true)
116  , useEvmBoard_(true)
117  , reasonForFailed_("")
118  , nbAllocateSent_(0)
119  , nbTakeReceived_(0)
120  , nbDataDiscardReceived_(0)
121  , nbDqmDiscardReceived_(0)
122  , nbSentLast_(0)
123  , sumOfSquaresLast_(0)
124  , sumOfSizesLast_(0)
125  , lock_(toolbox::BSem::FULL)
126  , frb_(0)
127  , shmInconsistent_(false)
128 {
129  // setup finite state machine (binding relevant callbacks)
131 
132  // set url, class, instance, and sourceId (=class_instance)
133  url_ =
134  getApplicationDescriptor()->getContextDescriptor()->getURL()+"/"+
135  getApplicationDescriptor()->getURN();
136  class_ =getApplicationDescriptor()->getClassName();
137  instance_=getApplicationDescriptor()->getInstance();
138  sourceId_=class_.toString()+"_"+instance_.toString();
139 
140  // bind i2o callbacks
142  I2O_FU_TAKE,XDAQ_ORGANIZATION_ID);
144  I2O_FU_DATA_DISCARD,XDAQ_ORGANIZATION_ID);
146  I2O_FU_DQM_DISCARD,XDAQ_ORGANIZATION_ID);
148  I2O_EVM_LUMISECTION,XDAQ_ORGANIZATION_ID);
149 
150 
151  // bind HyperDAQ web pages
152  xgi::bind(this,&evf::FUResourceBroker::webPageRequest,"Default");
153  gui_=new WebGUI(this,&fsm_);
154  vector<toolbox::lang::Method*> methods=gui_->getMethods();
155  vector<toolbox::lang::Method*>::iterator it;
156  for (it=methods.begin();it!=methods.end();++it) {
157  if ((*it)->type()=="cgi") {
158  string name=static_cast<xgi::MethodSignature*>(*it)->name();
159  xgi::bind(this,&evf::FUResourceBroker::webPageRequest,name);
160  }
161  }
162  xgi::bind(this,&evf::FUResourceBroker::customWebPage,"customWebPage");
163 
164 
165  // allocate i2o memery pool
166  string i2oPoolName=sourceId_+"_i2oPool";
167  try {
168  toolbox::mem::HeapAllocator *allocator=new toolbox::mem::HeapAllocator();
169  toolbox::net::URN urn("toolbox-mem-pool",i2oPoolName);
170  toolbox::mem::MemoryPoolFactory* poolFactory=
171  toolbox::mem::getMemoryPoolFactory();
172  i2oPool_=poolFactory->createPool(urn,allocator);
173  }
175  string s="Failed to create pool: "+i2oPoolName;
176  LOG4CPLUS_FATAL(log_,s);
177  XCEPT_RETHROW(xcept::Exception,s,e);
178  }
179 
180  // publish all parameters to app info space
182 
183  // findRcmsStateListener
185 
186  // set application icon for hyperdaq
187  getApplicationDescriptor()->setAttribute("icon", "/evf/images/rbicon.jpg");
188  //FUResource::useEvmBoard_ = useEvmBoard_;
189 }
190 
191 
192 //______________________________________________________________________________
194 {
195 
196 }
197 
198 
199 
201 // implementation of member functions
203 
204 //______________________________________________________________________________
205 bool FUResourceBroker::configuring(toolbox::task::WorkLoop* wl)
206 {
207  try {
208  LOG4CPLUS_INFO(log_, "Start configuring ...");
210  frb_ = new EvffedFillerRB(this);
212  if(shmInconsistent_){
213  std::ostringstream ost;
214  ost << "configuring FAILED: Inconsistency in ResourceTable - nbRaw="
215  << nbRawCells_.value_ << " but nbResources=" << resourceTable_->nbResources()
216  << " and nbFreeSlots=" << resourceTable_->nbFreeSlots();
217  XCEPT_RAISE(evf::Exception,ost.str());
218  }
219  LOG4CPLUS_INFO(log_, "Finished configuring!");
220  fsm_.fireEvent("ConfigureDone",this);
221 
222  }
223  catch (xcept::Exception &e) {
224  std::string msg = "configuring FAILED: " + (string)e.what();
225  reasonForFailed_ = e.what();
226  fsm_.fireFailed(msg,this);
227  }
228 
229  return false;
230 }
231 
232 
233 //______________________________________________________________________________
234 bool FUResourceBroker::enabling(toolbox::task::WorkLoop* wl)
235 {
236  try {
237  LOG4CPLUS_INFO(log_, "Start enabling ...");
246 
249  dataErrorFlag_ = 0;
250 
251  LOG4CPLUS_INFO(log_, "Finished enabling!");
252  fsm_.fireEvent("EnableDone",this);
253  }
254  catch (xcept::Exception &e) {
255  std::string msg = "enabling FAILED: "+xcept::stdformat_exception_history(e);
256  reasonForFailed_ = e.what();
257  fsm_.fireFailed(msg,this);
258  }
259 
260  return false;
261 }
262 
263 
264 //______________________________________________________________________________
265 bool FUResourceBroker::stopping(toolbox::task::WorkLoop* wl)
266 {
267  try {
268  LOG4CPLUS_INFO(log_, "Start stopping :) ...");
269  resourceTable_->stop();
270  timeval now;
271  timeval then;
272  gettimeofday(&then,0);
273  while (!resourceTable_->isReadyToShutDown()) {
274  ::usleep(shmResourceTableTimeout_.value_*10);
275  gettimeofday(&now,0);
276  if ((unsigned int)(now.tv_sec-then.tv_sec) > shmResourceTableTimeout_.value_/10000) {
277  std::cout << "times: " << now.tv_sec << " " << then.tv_sec << " "
278  << shmResourceTableTimeout_.value_/10000 << std::endl;
279  LOG4CPLUS_WARN(log_, "Some Process did not detach - going to Emergency stop!");
280  emergencyStop();
281  break;
282  }
283  }
284 
286  LOG4CPLUS_INFO(log_, "Finished stopping!");
287  fsm_.fireEvent("StopDone",this);
288  }
289  }
290  catch (xcept::Exception &e) {
291  std::string msg = "stopping FAILED: "+xcept::stdformat_exception_history(e);
292  reasonForFailed_ = e.what();
293  fsm_.fireFailed(msg,this);
294  }
295 
296  return false;
297 }
298 
299 
300 //______________________________________________________________________________
301 bool FUResourceBroker::halting(toolbox::task::WorkLoop* wl)
302 {
303  try {
304  LOG4CPLUS_INFO(log_, "Start halting ...");
305  if (resourceTable_->isActive()) {
306  resourceTable_->halt();
307  UInt_t count = 0;
308  while (count<10) {
310  lock();
311  delete resourceTable_;
312  resourceTable_=0;
313  unlock();
314  LOG4CPLUS_INFO(log_,count+1<<". try to destroy resource table succeeded!");
315  break;
316  }
317  else {
318  count++;
319  LOG4CPLUS_DEBUG(log_,count<<". try to destroy resource table failed ...");
320  ::sleep(1);
321  }
322  }
323  }
324  else {
325  lock();
326  delete resourceTable_;
327  resourceTable_=0;
328  unlock();
329  }
330 
331  if (0==resourceTable_) {
332  LOG4CPLUS_INFO(log_,"Finished halting!");
333  fsm_.fireEvent("HaltDone",this);
334  }
335  else {
336  std::string msg = "halting FAILED: ResourceTable shutdown timed out.";
337  reasonForFailed_ = "RESOURCETABLE SHUTDOWN TIMED OUT";
338  fsm_.fireFailed(msg,this);
339  }
340  }
341  catch (xcept::Exception &e) {
342  std::string msg = "halting FAILED: "+xcept::stdformat_exception_history(e);
343  reasonForFailed_ = e.what();
344  fsm_.fireFailed(msg,this);
345  }
346  if(frb_) delete frb_;
347  return false;
348 }
349 
350 
351 //______________________________________________________________________________
352 xoap::MessageReference FUResourceBroker::fsmCallback(xoap::MessageReference msg)
354 {
355  return fsm_.commandCallback(msg);
356 }
357 
358 
359 //______________________________________________________________________________
360 void FUResourceBroker::I2O_FU_TAKE_Callback(toolbox::mem::Reference* bufRef)
361 {
362  nbTakeReceived_.value_++;
363  if(fsm_.checkIfEnabled()){
364  bool eventComplete=resourceTable_->buildResource(bufRef);
365  if (eventComplete&&doDropEvents_) resourceTable_->dropEvent();
366  }
367  else{
368  LOG4CPLUS_ERROR(log_,"TAKE i2o frame received in state "
369  << fsm_.stateName() << " is being lost");
370  bufRef->release();
371  }
372 }
373 
374 //______________________________________________________________________________
375 void FUResourceBroker::I2O_EVM_LUMISECTION_Callback(toolbox::mem::Reference* bufRef)
376 {
377 
378  I2O_EVM_END_OF_LUMISECTION_MESSAGE_FRAME *msg =
379  (I2O_EVM_END_OF_LUMISECTION_MESSAGE_FRAME *)bufRef->getDataLocation();
380  if(msg->lumiSection==0){
381  LOG4CPLUS_ERROR(log_,"EOL message received for ls=0!!! ");
382  fsm_.fireFailed("EOL message received for ls=0!!! ",this);
383  }
384  resourceTable_->postEndOfLumiSection(bufRef); // this method dummy for now
385 // I2O_EVM_END_OF_LUMISECTION_MESSAGE_FRAME *msg =
386 // (I2O_EVM_END_OF_LUMISECTION_MESSAGE_FRAME *)bufRef->getDataLocation();
387 
388 // LOG4CPLUS_WARN(log_, "Received END-OF-LS from EVM for LS " << msg->lumiSection);
389 
390 }
391 
392 
393 
394 
395 //______________________________________________________________________________
396 void FUResourceBroker::I2O_FU_DATA_DISCARD_Callback(toolbox::mem::Reference* bufRef)
397 {
398  nbDataDiscardReceived_.value_++;
400 }
401 
402 
403 //______________________________________________________________________________
404 void FUResourceBroker::I2O_FU_DQM_DISCARD_Callback(toolbox::mem::Reference* bufRef)
405 {
406  nbDqmDiscardReceived_.value_++;
408 }
409 
410 
411 //______________________________________________________________________________
413 {
414  typedef set<xdaq::ApplicationDescriptor*> AppDescSet_t;
415  typedef AppDescSet_t::iterator AppDescIter_t;
416 
417  // locate input BU
418  AppDescSet_t setOfBUs=
419  getApplicationContext()->getDefaultZone()->
420  getApplicationDescriptors(buClassName_.toString());
421 
422  if (0!=bu_) { delete bu_; bu_=0; }
423 
424  for (AppDescIter_t it=setOfBUs.begin();it!=setOfBUs.end();++it)
425  if ((*it)->getInstance()==buInstance_)
426  bu_=new BUProxy(getApplicationDescriptor(),*it,
427  getApplicationContext(),i2oPool_);
428 
429  if (0==bu_) {
430  string msg=sourceId_+" failed to locate input BU!";
431  XCEPT_RAISE(evf::Exception,msg);
432  }
433 
434  // locate output SM
435  AppDescSet_t setOfSMs=
436  getApplicationContext()->getDefaultZone()->
437  getApplicationDescriptors(smClassName_.toString());
438 
439  if (0!=sm_) { delete sm_; sm_=0; }
440 
441  for (AppDescIter_t it=setOfSMs.begin();it!=setOfSMs.end();++it)
442  if ((*it)->getInstance()==smInstance_)
443  sm_=new SMProxy(getApplicationDescriptor(),*it,
444  getApplicationContext(),i2oPool_);
445 
446  if (0==sm_) LOG4CPLUS_WARN(log_,sourceId_<<" failed to locate output SM!");
447 }
448 
449 
450 //______________________________________________________________________________
453 {
454  string name=in->getenv("PATH_INFO");
455  if (name.empty()) name="defaultWebPage";
456  static_cast<xgi::MethodSignature*>(gui_->getMethod(name))->invoke(in,out);
457 }
458 
459 
460 //______________________________________________________________________________
462 {
463  lock();
464 
465  if (0!=resourceTable_) {
466 
467  //gui_->monInfoSpace()->lock();
468 
469  if (e.type()=="urn:xdata-event:ItemGroupRetrieveEvent") {
485  dataErrorFlag_.value_ = (nbCrcErrors_.value_ != 0u +
486  ((nbDataErrors_.value_ != 0u) << 1) +
487  ((nbLostEvents_.value_ != 0u) << 2) +
488  ((nbTimeoutsWithEvent_.value_ != 0u) << 3) +
489  ((nbTimeoutsWithoutEvent_.value_ != 0u) << 4) +
490  ((nbSentErrorEvents_.value_ != 0u) << 5)
491  );
492  }
493  else if (e.type()=="ItemChangedEvent") {
494 
495  string item=dynamic_cast<xdata::ItemChangedEvent&>(e).itemName();
496 
497  if (item=="doFedIdCheck") FUResource::doFedIdCheck(doFedIdCheck_);
498  if (item=="useEvmBoard") FUResource::useEvmBoard(useEvmBoard_);
499  if (item=="doCrcCheck") resourceTable_->setDoCrcCheck(doCrcCheck_);
500  if (item=="doDumpEvents") resourceTable_->setDoDumpEvents(doDumpEvents_);
501  }
502 
503  //gui_->monInfoSpace()->unlock();
504  }
505  else {
506  nbClients_ =0;
507  clientPrcIds_ ="";
511  nbSentEvents_ =0;
512  nbSentDqmEvents_ =0;
517  nbLostEvents_ =0;
518  nbDataErrors_ =0;
519  nbCrcErrors_ =0;
520  nbAllocateSent_ =0;
521  }
522  unlock();
523 }
524 
525 
526 //______________________________________________________________________________
528 {
529  struct timezone timezone;
530  gettimeofday(&monStartTime_,&timezone);
531 
532  try {
534  toolbox::task::getWorkLoopFactory()->getWorkLoop(sourceId_+"Monitoring",
535  "waiting");
536  if (!wlMonitoring_->isActive()) wlMonitoring_->activate();
537  asMonitoring_=toolbox::task::bind(this,&FUResourceBroker::monitoring,
538  sourceId_+"Monitoring");
539  wlMonitoring_->submit(asMonitoring_);
540  }
541  catch (xcept::Exception& e) {
542  string msg = "Failed to start workloop 'Monitoring'.";
543  XCEPT_RETHROW(evf::Exception,msg,e);
544  }
545 }
546 
547 
548 //______________________________________________________________________________
549 bool FUResourceBroker::monitoring(toolbox::task::WorkLoop* wl)
550 {
551  unsigned int nbSent;
552  uint64_t sumOfSquares;
553  unsigned int sumOfSizes;
554  uint64_t deltaSumOfSquares;
555 
556  lock();
557  if (0==resourceTable_) {
558  deltaT_.value_ =0.0;
559  deltaN_.value_ = 0;
560  deltaSumOfSquares_.value_=0.0;
561  deltaSumOfSizes_.value_ = 0;
562  throughput_ =0.0;
563  rate_ =0.0;
564  average_ =0.0;
565  rms_ =0.0;
566  unlock();
567  return false;
568  }
569  else {
570  nbSent =resourceTable_->nbSent();
571  sumOfSquares=resourceTable_->sumOfSquares();
572  sumOfSizes =resourceTable_->sumOfSizes();
573  }
574  unlock();
575 
576  struct timeval monEndTime;
577  struct timezone timezone;
578 
579  gettimeofday(&monEndTime,&timezone);
580 
581  xdata::getInfoSpaceFactory()->lock();
582  gui_->monInfoSpace()->lock();
583 
584  deltaT_.value_=deltaT(&monStartTime_,&monEndTime);
585  monStartTime_=monEndTime;
586 
587  deltaN_.value_=nbSent-nbSentLast_;
588  nbSentLast_=nbSent;
589 
590  deltaSumOfSquares=sumOfSquares-sumOfSquaresLast_;
591  deltaSumOfSquares_.value_=(double)deltaSumOfSquares;
592  sumOfSquaresLast_=sumOfSquares;
593 
594  deltaSumOfSizes_.value_=sumOfSizes-sumOfSizesLast_;
595  sumOfSizesLast_=sumOfSizes;
596 
597  if (deltaT_.value_!=0) {
598  throughput_=deltaSumOfSizes_.value_/deltaT_.value_;
599  rate_ =deltaN_.value_/deltaT_.value_;
600  }
601  else {
602  throughput_=0.0;
603  rate_ =0.0;
604  }
605 
606  double meanOfSquares,mean,squareOfMean,variance;
607 
608  if(deltaN_.value_!=0) {
609  meanOfSquares=deltaSumOfSquares_.value_/((double)(deltaN_.value_));
610  mean=((double)(deltaSumOfSizes_.value_))/((double)(deltaN_.value_));
611  squareOfMean=mean*mean;
612  variance=meanOfSquares-squareOfMean; if(variance<0.0) variance=0.0;
613 
614  average_=deltaSumOfSizes_.value_/deltaN_.value_;
615  rms_ =std::sqrt(variance);
616  }
617  else {
618  average_=0.0;
619  rms_ =0.0;
620  }
621 
622  gui_->monInfoSpace()->unlock();
623  xdata::getInfoSpaceFactory()->unlock();
624 
625  ::sleep(monSleepSec_.value_);
626 
627  return true;
628 }
629 
630 
631 //______________________________________________________________________________
633 {
634  try {
635  wlWatching_=
636  toolbox::task::getWorkLoopFactory()->getWorkLoop(sourceId_+"Watching",
637  "waiting");
638  if (!wlWatching_->isActive()) wlWatching_->activate();
639  asWatching_=toolbox::task::bind(this,&FUResourceBroker::watching,
640  sourceId_+"Watching");
641  wlWatching_->submit(asWatching_);
642  }
643  catch (xcept::Exception& e) {
644  string msg = "Failed to start workloop 'Watching'.";
645  XCEPT_RETHROW(evf::Exception,msg,e);
646  }
647 }
648 
649 
650 //______________________________________________________________________________
651 bool FUResourceBroker::watching(toolbox::task::WorkLoop* wl)
652 {
653  lock();
654 
655  if (0==resourceTable_) {
656  unlock();
657  return false;
658  }
659 
660  vector<pid_t> evt_prcids =resourceTable_->cellPrcIds();
661  vector<UInt_t> evt_numbers=resourceTable_->cellEvtNumbers();
662  vector<time_t> evt_tstamps=resourceTable_->cellTimeStamps();
663 
664  time_t tcurr=time(0);
665  for (UInt_t i=0;i<evt_tstamps.size();i++) {
666  pid_t pid =evt_prcids[i];
667  UInt_t evt =evt_numbers[i];
668  time_t tstamp=evt_tstamps[i]; if (tstamp==0) continue;
669  double tdiff =difftime(tcurr,tstamp);
670  if (tdiff>timeOutSec_) {
672  LOG4CPLUS_ERROR(log_,"evt "<<evt<<" timed out, "<<"kill prc "<<pid);
673  kill(pid,9);
675  }
676  else {
677  LOG4CPLUS_INFO(log_,"evt "<<evt<<" under processing for more than "
678  <<timeOutSec_<<"sec for process "<<pid);
679  }
680  }
681  }
682 
683  vector<pid_t> prcids=resourceTable_->clientPrcIds();
684  for (UInt_t i=0;i<prcids.size();i++) {
685  pid_t pid =prcids[i];
686  int status=kill(pid,0);
687  if (status!=0) {
688  LOG4CPLUS_ERROR(log_,"EP prc "<<pid<<" died, send to error stream if processing.");
691  }
692  }
693 
695  std::ostringstream ost;
696  ost << "Watchdog spotted inconsistency in ResourceTable - nbRaw="
697  << nbRawCells_.value_ << " but nbResources=" << resourceTable_->nbResources()
698  << " and nbFreeSlots=" << resourceTable_->nbFreeSlots();
699  XCEPT_DECLARE(evf::Exception,
700  sentinelException, ost.str());
701  notifyQualified("error",sentinelException);
702  shmInconsistent_ = true;
703  }
704 
705  unlock();
706 
707  ::sleep(watchSleepSec_.value_);
708  return true;
709 }
710 
711 
712 //______________________________________________________________________________
714 {
715  assert(0!=gui_);
716 
717  gui_->addMonitorParam("url", &url_);
718  gui_->addMonitorParam("class", &class_);
719  gui_->addMonitorParam("instance", &instance_);
720  gui_->addMonitorParam("runNumber", &runNumber_);
721  gui_->addMonitorParam("stateName", fsm_.stateName());
722 
723  gui_->addMonitorParam("deltaT", &deltaT_);
724  gui_->addMonitorParam("deltaN", &deltaN_);
725  gui_->addMonitorParam("deltaSumOfSquares", &deltaSumOfSquares_);
726  gui_->addMonitorParam("deltaSumOfSizes", &deltaSumOfSizes_);
727 
728  gui_->addMonitorParam("throughput", &throughput_);
729  gui_->addMonitorParam("rate", &rate_);
730  gui_->addMonitorParam("average", &average_);
731  gui_->addMonitorParam("rms", &rms_);
732  gui_->addMonitorParam("dataErrorFlag", &dataErrorFlag_);
733 
734  gui_->addMonitorCounter("nbAllocatedEvents", &nbAllocatedEvents_);
735  gui_->addMonitorCounter("nbPendingRequests", &nbPendingRequests_);
736  gui_->addMonitorCounter("nbReceivedEvents", &nbReceivedEvents_);
737  gui_->addMonitorCounter("nbSentEvents", &nbSentEvents_);
738  gui_->addMonitorCounter("nbSentErrorEvents", &nbSentErrorEvents_);
739  gui_->addMonitorCounter("nbDiscardedEvents", &nbDiscardedEvents_);
740  gui_->addMonitorCounter("nbPendingSMDiscards", &nbPendingSMDiscards_);
741 
742  gui_->addMonitorCounter("nbSentDqmEvents", &nbSentDqmEvents_);
743  gui_->addMonitorCounter("nbDqmDiscardReceived", &nbDqmDiscardReceived_);
744  gui_->addMonitorCounter("nbPendingSMDqmDiscards", &nbPendingSMDqmDiscards_);
745 
746  gui_->addMonitorCounter("nbLostEvents", &nbLostEvents_);
747  gui_->addMonitorCounter("nbDataErrors", &nbDataErrors_);
748  gui_->addMonitorCounter("nbCrcErrors", &nbCrcErrors_);
749  gui_->addMonitorCounter("nbTimeoutsWithEvent", &nbTimeoutsWithEvent_);
750  gui_->addMonitorCounter("nbTimeoutsWithoutEvent", &nbTimeoutsWithoutEvent_);
751 
752  gui_->addStandardParam("segmentationMode", &segmentationMode_);
753  gui_->addStandardParam("nbClients", &nbClients_);
754  gui_->addStandardParam("clientPrcIds", &clientPrcIds_);
755  gui_->addStandardParam("nbRawCells", &nbRawCells_);
756  gui_->addStandardParam("nbRecoCells", &nbRecoCells_);
757  gui_->addStandardParam("nbDqmCells", &nbDqmCells_);
758  gui_->addStandardParam("rawCellSize", &rawCellSize_);
759  gui_->addStandardParam("recoCellSize", &recoCellSize_);
760  gui_->addStandardParam("dqmCellSize", &dqmCellSize_);
761 
762  gui_->addStandardParam("doDropEvents", &doDropEvents_);
763  gui_->addStandardParam("doFedIdCheck", &doFedIdCheck_);
764  gui_->addStandardParam("doCrcCheck", &doCrcCheck_);
765  gui_->addStandardParam("doDumpEvents", &doDumpEvents_);
766  gui_->addStandardParam("buClassName", &buClassName_);
767  gui_->addStandardParam("buInstance", &buInstance_);
768  gui_->addStandardParam("smClassName", &smClassName_);
769  gui_->addStandardParam("smInstance", &smInstance_);
770  gui_->addStandardParam("shmResourceTableTimeout", &shmResourceTableTimeout_);
771  gui_->addStandardParam("monSleepSec", &monSleepSec_);
772  gui_->addStandardParam("watchSleepSec", &watchSleepSec_);
773  gui_->addStandardParam("timeOutSec", &timeOutSec_);
774  gui_->addStandardParam("processKillerEnabled", &processKillerEnabled_);
775  gui_->addStandardParam("useEvmBoard", &useEvmBoard_);
776  gui_->addStandardParam("rcmsStateListener", fsm_.rcmsStateListener());
777  gui_->addStandardParam("foundRcmsStateListener", fsm_.foundRcmsStateListener());
778  gui_->addStandardParam("reasonForFailed", &reasonForFailed_);
779 
780  gui_->addDebugCounter("nbAllocateSent", &nbAllocateSent_);
781  gui_->addDebugCounter("nbTakeReceived", &nbTakeReceived_);
782  gui_->addDebugCounter("nbDataDiscardReceived", &nbDataDiscardReceived_);
783 
785 
786  gui_->addItemChangedListener("doFedIdCheck", this);
787  gui_->addItemChangedListener("useEvmBoard", this);
788  gui_->addItemChangedListener("doCrcCheck", this);
789  gui_->addItemChangedListener("doDumpEvents", this);
790 }
791 
792 
793 //______________________________________________________________________________
795 {
796  gui_->resetCounters();
797 
798  deltaT_ =0.0;
799  deltaN_ = 0;
800  deltaSumOfSquares_=0.0;
801  deltaSumOfSizes_ = 0;
802 
803  throughput_ =0.0;
804  rate_ =0.0;
805  average_ =0.0;
806  rms_ =0.0;
807 
808  nbSentLast_ = 0;
809  sumOfSquaresLast_ = 0;
810  sumOfSizesLast_ = 0;
811 }
812 
813 
814 //______________________________________________________________________________
815 double FUResourceBroker::deltaT(const struct timeval *start,
816  const struct timeval *end)
817 {
818  unsigned int sec;
819  unsigned int usec;
820 
821  sec = end->tv_sec - start->tv_sec;
822 
823  if(end->tv_usec > start->tv_usec) {
824  usec = end->tv_usec - start->tv_usec;
825  }
826  else {
827  sec--;
828  usec = 1000000 - ((unsigned int )(start->tv_usec - end->tv_usec));
829  }
830 
831  return ((double)sec) + ((double)usec) / 1000000.0;
832 }
833 
834 
835 
836 //______________________________________________________________________________
839 {
840  using namespace cgicc;
841  Cgicc cgi(in);
842  std::vector<FormEntry> els = cgi.getElements() ;
843  for(std::vector<FormEntry>::iterator it = els.begin(); it != els.end(); it++)
844  std::cout << "form entry " << (*it).getValue() << std::endl;
845 
846  std::vector<FormEntry> el1;
847  cgi.getElement("crcError",el1);
848  *out<<"<html>"<<endl;
849  gui_->htmlHead(in,out,sourceId_);
850  *out<<"<body>"<<endl;
851  gui_->htmlHeadline(in,out);
852 
853  lock();
854 
855  if (0!=resourceTable_) {
856  if(el1.size()!=0) {
857  resourceTable_->injectCRCError();
858  }
859  *out << "<form method=\"GET\" action=\"customWebPage\" >";
860  *out << "<button name=\"crcError\" type=\"submit\" value=\"injCRC\">Inject CRC</button>" << endl;
861  *out << "</form>" << endl;
862  *out << "<hr/>" << std::endl;
863  vector<pid_t> client_prc_ids = resourceTable_->clientPrcIds();
864  *out<<table().set("frame","void").set("rules","rows")
865  .set("class","modules").set("width","250")<<endl
866  <<tr()<<th("Client Processes").set("colspan","3")<<tr()<<endl
867  <<tr()
868  <<th("client").set("align","left")
869  <<th("process id").set("align","center")
870  <<th("status").set("align","center")
871  <<tr()
872  <<endl;
873  for (UInt_t i=0;i<client_prc_ids.size();i++) {
874 
875  pid_t pid =client_prc_ids[i];
876  int status=kill(pid,0);
877 
878  stringstream ssi; ssi<<i+1;
879  stringstream sspid; sspid<<pid;
880  stringstream ssstatus; ssstatus<<status;
881 
882  string bg_status = (status==0) ? "#00ff00" : "ff0000";
883  *out<<tr()
884  <<td(ssi.str()).set("align","left")
885  <<td(sspid.str()).set("align","center")
886  <<td(ssstatus.str()).set("align","center").set("bgcolor",bg_status)
887  <<tr()<<endl;
888  }
889  *out<<table()<<endl;
890  *out<<"<br><br>"<<endl;
891 
892  vector<string> states = resourceTable_->cellStates();
893  vector<UInt_t> evt_numbers = resourceTable_->cellEvtNumbers();
894  vector<pid_t> prc_ids = resourceTable_->cellPrcIds();
895  vector<time_t> time_stamps = resourceTable_->cellTimeStamps();
896 
897  *out<<table().set("frame","void").set("rules","rows")
898  .set("class","modules").set("width","500")<<endl
899  <<tr()<<th("Shared Memory Cells").set("colspan","6")<<tr()<<endl
900  <<tr()
901  <<th("cell").set("align","left")
902  <<th("state").set("align","center")
903  <<th("event").set("align","center")
904  <<th("process id").set("align","center")
905  <<th("timestamp").set("align","center")
906  <<th("time").set("align","center")
907  <<tr()
908  <<endl;
909  for (UInt_t i=0;i<states.size();i++) {
910  string state=states[i];
911  UInt_t evt = evt_numbers[i];
912  pid_t pid = prc_ids[i];
913  time_t tstamp= time_stamps[i];
914  double tdiff = difftime(time(0),tstamp);
915 
916  stringstream ssi; ssi<<i;
917  stringstream ssevt; if (evt!=0xffffffff) ssevt<<evt; else ssevt<<" - ";
918  stringstream sspid; if (pid!=0) sspid<<pid; else sspid<<" - ";
919  stringstream sststamp; if (tstamp!=0) sststamp<<tstamp; else sststamp<<" - ";
920  stringstream sstdiff; if (tstamp!=0) sstdiff<<tdiff; else sstdiff<<" - ";
921 
922  string bg_state = "#ffffff";
923  if (state=="RAWWRITING"||state=="RAWWRITTEN"||
924  state=="RAWREADING"||state=="RAWREAD")
925  bg_state="#99CCff";
926  else if (state=="PROCESSING")
927  bg_state="#ff0000";
928  else if (state=="PROCESSED"||state=="RECOWRITING"||state=="RECOWRITTEN")
929  bg_state="#CCff99";
930  else if (state=="SENDING")
931  bg_state="#00FF33";
932  else if (state=="SENT")
933  bg_state="#006633";
934  else if (state=="DISCARDING")
935  bg_state="#FFFF00";
936  else if (state=="LUMISECTION")
937  bg_state="#0000FF";
938 
939  *out<<tr()
940  <<td(ssi.str()).set("align","left")
941  <<td(state).set("align","center").set("bgcolor",bg_state)
942  <<td(ssevt.str()).set("align","center")
943  <<td(sspid.str()).set("align","center")
944  <<td(sststamp.str()).set("align","center")
945  <<td(sstdiff.str()).set("align","center")
946  <<tr()<<endl;
947  }
948  *out<<table()<<endl;
949  *out<<"<br><br>"<<endl;
950 
951  vector<string> dqmstates = resourceTable_->dqmCellStates();
952 
953  *out<<table().set("frame","void").set("rules","rows")
954  .set("class","modules").set("width","500")<<endl
955  <<tr()<<th("Shared Memory DQM Cells").set("colspan","6")<<tr()<<endl
956  <<tr()
957  <<th("cell").set("align","left")
958  <<th("state").set("align","center")
959  <<tr()
960  <<endl;
961  for (UInt_t i=0;i<dqmstates.size();i++) {
962  string state=dqmstates[i];
963 
964  string bg_state = "#ffffff";
965  if (state=="WRITING"||state=="WRITTEN")
966  bg_state="#99CCff";
967  else if (state=="SENDING")
968  bg_state="#00FF33";
969  else if (state=="SENT")
970  bg_state="#006633";
971  else if (state=="DISCARDING")
972  bg_state="#FFFF00";
973 
974  *out<<tr()
975  <<td(state).set("align","center").set("bgcolor",bg_state)
976  <<tr()<<endl;
977  }
978  *out<<table()<<endl;
979 
980 
981 
982  }
983  *out<<"</body>"<<endl<<"</html>"<<endl;
984 
985  unlock();
986 }
987 
989 {
990  LOG4CPLUS_WARN(log_, "in Emergency stop - handle non-clean stops");
991  vector<pid_t> client_prc_ids = resourceTable_->clientPrcIds();
992  for (UInt_t i=0;i<client_prc_ids.size();i++) {
993  pid_t pid =client_prc_ids[i];
994  std::cout << "B: killing process " << i << "pid=" << pid << std::endl;
995  if(pid!=0){
996  //assume processes are dead by now
999  else
1001  }
1002  }
1004  ::sleep(1);
1006  {
1007  reasonForFailed_ = "EmergencyStop: failed to shut down ResourceTable";
1008  XCEPT_RAISE(evf::Exception,reasonForFailed_);
1009  }
1011  lock();
1012  std::cout << "delete resourcetable" <<std::endl;
1013  delete resourceTable_;
1014  resourceTable_=0;
1015  std::cout << "cycle through resourcetable config " << std::endl;
1017  unlock();
1018  if(shmInconsistent_) XCEPT_RAISE(evf::Exception,"Inconsistent shm state");
1019  std::cout << "done with emergency stop" << std::endl;
1020 }
1021 
1023 {
1025  nbRawCells_.value_,
1026  nbRecoCells_.value_,
1027  nbDqmCells_.value_,
1028  rawCellSize_.value_,
1029  recoCellSize_.value_,
1030  dqmCellSize_.value_,
1031  bu_,sm_,
1032  log_,
1033  shmResourceTableTimeout_.value_,
1034  frb_,
1035  this);
1040  reset();
1041  shmInconsistent_ = false;
1042  if(resourceTable_->nbResources() != nbRawCells_.value_ ||
1043  resourceTable_->nbFreeSlots() != nbRawCells_.value_)
1044  shmInconsistent_ = true;
1045 }
1046 
1047 
1049 // XDAQ instantiator implementation macro
1051 
1052 XDAQ_INSTANTIATOR_IMPL(FUResourceBroker)
list table
Definition: asciidump.py:386
static const char runNumber_[]
xdata::String clientPrcIds_
bool discardDataEvent(MemRef_t *bufRef)
xdata::UnsignedInteger32 nbLostEvents_
UInt_t nbFreeSlots() const
void addDebugCounter(CString_t &name, Counter_t *counter)
Definition: WebGUI.cc:189
int i
Definition: DBlmapReader.cc:9
void resetCounters()
Definition: WebGUI.cc:217
std::vector< UInt_t > cellEvtNumbers() const
#define Input(cl)
Definition: vmac.h:189
xdata::UnsignedInteger32 monSleepSec_
xdata::UnsignedInteger32 nbDiscardedEvents_
xdata::UnsignedInteger32 nbReceivedEvents_
xdata::UnsignedInteger32 nbPendingSMDqmDiscards_
xdata::UnsignedInteger32 nbPendingRequests_
xdata::UnsignedInteger32 nbSentDqmEvents_
xdata::UnsignedInteger32 nbRawCells_
void addStandardParam(CString_t &name, Param_t *param)
Definition: WebGUI.cc:134
bool handleCrashedEP(UInt_t runNumber, pid_t pid)
void exportParameters()
Definition: WebGUI.cc:200
void I2O_FU_TAKE_Callback(toolbox::mem::Reference *bufRef)
void setDoDumpEvents(UInt_t doDumpEvents)
bool isActive() const
xdata::UnsignedInteger32 nbTimeoutsWithoutEvent_
void customWebPage(xgi::Input *in, xgi::Output *out)
xdata::UnsignedInteger32 nbSentEvents_
xdata::UnsignedInteger32 nbSentErrorEvents_
xdata::UnsignedInteger32 buInstance_
xdata::UnsignedInteger32 nbDataDiscardReceived_
bool watching(toolbox::task::WorkLoop *wl)
UInt_t nbErrors() const
bool buildResource(MemRef_t *bufRef)
tuple els
Definition: asciidump.py:420
void addMonitorParam(CString_t &name, Param_t *param)
Definition: WebGUI.cc:145
void addItemChangedListener(CString_t &name, xdata::ActionListener *l)
Definition: WebGUI.cc:238
xdata::String reasonForFailed_
void I2O_FU_DATA_DISCARD_Callback(toolbox::mem::Reference *bufRef)
#define I2O_FU_DQM_DISCARD
Definition: i2oEvfMsgs.h:27
UInt_t nbLost() const
double deltaT(const struct timeval *start, const struct timeval *end)
static void useEvmBoard(bool useEvmBoard)
Definition: FUResource.h:57
void sleep(Duration_t)
Definition: Utils.h:163
xdata::Boolean segmentationMode_
UInt_t nbClients() const
void I2O_FU_DQM_DISCARD_Callback(toolbox::mem::Reference *bufRef)
xdata::UnsignedInteger32 nbDqmDiscardReceived_
void initialize(T *app)
Definition: StateMachine.h:84
xdata::UnsignedInteger32 nbDataErrors_
xdata::UnsignedInteger32 doDumpEvents_
xdata::UnsignedInteger32 nbClients_
xdata::Bag< xdaq2rc::ClassnameAndInstance > * rcmsStateListener()
Definition: StateMachine.h:72
void fireFailed(const std::string &errorMsg, void *originator)
bool halting(toolbox::task::WorkLoop *wl)
xdata::UnsignedInteger32 runNumber_
bool configuring(toolbox::task::WorkLoop *wl)
xdata::UnsignedInteger32 smInstance_
UInt_t nbDiscarded() const
uint64_t sumOfSquares() const
bool isReadyToShutDown() const
toolbox::task::WorkLoop * wlMonitoring_
EvffedFillerRB * frb_
UInt_t nbPendingSMDqmDiscards() const
bool stopping(toolbox::task::WorkLoop *wl)
T sqrt(T t)
Definition: SSEVec.h:28
xdata::Boolean doFedIdCheck_
UInt_t sumOfSizes() const
std::vector< time_t > cellTimeStamps() const
std::vector< pid_t > clientPrcIds() const
xdata::UnsignedInteger32 nbTakeReceived_
UInt_t nbAllocated() const
xdata::UnsignedInteger32 timeOutSec_
xdata::UnsignedInteger32 rawCellSize_
xdata::UnsignedInteger32 nbRecoCells_
#define end
Definition: vmac.h:38
UInt_t nbCrcErrors() const
xdata::UnsignedInteger32 nbDqmCells_
UInt_t nbResources() const
void setRunNumber(UInt_t runNumber)
bool enabling(toolbox::task::WorkLoop *wl)
UInt_t nbSentError() const
void fireEvent(const std::string &evtType, void *originator)
#define I2O_FU_DATA_DISCARD
Definition: i2oEvfMsgs.h:26
tuple out
Definition: dbtoconf.py:99
xdata::UnsignedInteger32 nbAllocatedEvents_
void actionPerformed(xdata::Event &e)
unsigned int UInt_t
Definition: FUTypes.h:12
xdata::UnsignedInteger32 nbTimeoutsWithEvent_
xdata::Boolean processKillerEnabled_
xdata::UnsignedInteger32 nbPendingSMDiscards_
std::string clientPrcIdsAsString() const
FUResourceTable * resourceTable_
xdata::UnsignedInteger32 deltaSumOfSizes_
xdata::Boolean useEvmBoard_
UInt_t nbCompleted() const
xdata::UnsignedInteger32 dqmCellSize_
unsigned long long uint64_t
Definition: Time.h:15
xdata::String * stateName()
Definition: StateMachine.h:69
xdata::InfoSpace * monInfoSpace()
Definition: WebGUI.h:72
xdata::Double deltaSumOfSquares_
UInt_t nbSent() const
xdata::Boolean * foundRcmsStateListener()
Definition: StateMachine.h:78
UInt_t nbPendingSMDiscards() const
char state
Definition: procUtils.cc:75
xoap::MessageReference fsmCallback(xoap::MessageReference msg)
xdata::String smClassName_
UInt_t nbAllocSent() const
std::vector< pid_t > cellPrcIds() const
xdata::UnsignedInteger32 deltaN_
#define Output(cl)
Definition: vmac.h:193
xdata::UnsignedInteger32 recoCellSize_
xdata::String buClassName_
UInt_t nbSentDqm() const
xdata::UnsignedInteger32 watchSleepSec_
xdata::UnsignedInteger32 shmResourceTableTimeout_
static void doFedIdCheck(bool doFedIdCheck)
Definition: FUResource.h:55
void I2O_EVM_LUMISECTION_Callback(toolbox::mem::Reference *bufRef)
tuple cout
Definition: gather_cfg.py:41
void addMonitorCounter(CString_t &name, Counter_t *counter)
Definition: WebGUI.cc:178
bool monitoring(toolbox::task::WorkLoop *wl)
void postEndOfLumiSection(MemRef_t *bufRef)
string s
Definition: asciidump.py:422
toolbox::mem::Pool * i2oPool_
tuple status
Definition: ntuplemaker.py:245
toolbox::task::WorkLoop * wlWatching_
xdata::Boolean doDropEvents_
xdata::UnsignedInteger32 doCrcCheck_
xdata::UnsignedInteger32 instance_
toolbox::task::ActionSignature * asMonitoring_
bool discardDqmEvent(MemRef_t *bufRef)
xdata::UnsignedInteger32 nbCrcErrors_
void webPageRequest(xgi::Input *in, xgi::Output *out)
toolbox::task::ActionSignature * asWatching_
xdata::UnsignedInteger32 nbAllocateSent_
struct timeval monStartTime_
void setDoCrcCheck(UInt_t doCrcCheck)
void findRcmsStateListener()
xdata::UnsignedInteger32 dataErrorFlag_
evf::StateMachine fsm_
UInt_t nbPending() const