20 #include "i2o/Method.h"
21 #include "interface/shared/i2oXFunctionCodes.h"
22 #include "interface/evb/i2oEVBMsgs.h"
23 #include "xcept/tools.h"
25 #include "toolbox/mem/HeapAllocator.h"
26 #include "toolbox/mem/Reference.h"
27 #include "toolbox/mem/MemoryPoolFactory.h"
28 #include "toolbox/mem/exception/Exception.h"
30 #include "xoap/MessageReference.h"
31 #include "xoap/MessageFactory.h"
32 #include "xoap/SOAPEnvelope.h"
33 #include "xoap/SOAPBody.h"
34 #include "xoap/domutils.h"
35 #include "xoap/Method.h"
37 #include "cgicc/CgiDefs.h"
38 #include "cgicc/Cgicc.h"
39 #include "cgicc/FormEntry.h"
40 #include "cgicc/HTMLClasses.h"
56 FUResourceBroker::FUResourceBroker(xdaq::ApplicationStub *
s)
57 : xdaq::Application(s)
60 , log_(getApplicationLogger())
73 , deltaSumOfSquares_(0)
79 , nbAllocatedEvents_(0)
80 , nbPendingRequests_(0)
81 , nbReceivedEvents_(0)
84 , nbSentErrorEvents_(0)
85 , nbPendingSMDiscards_(0)
86 , nbPendingSMDqmDiscards_(0)
87 , nbDiscardedEvents_(0)
91 , nbTimeoutsWithEvent_(0)
92 , nbTimeoutsWithoutEvent_(0)
94 , segmentationMode_(
false)
100 , rawCellSize_(0x400000)
101 , recoCellSize_(0x800000)
102 , dqmCellSize_(0x800000)
103 , doDropEvents_(
false)
104 , doFedIdCheck_(
true)
109 , smClassName_(
"StorageManager")
111 , shmResourceTableTimeout_(200000)
115 , processKillerEnabled_(
true)
117 , reasonForFailed_(
"")
120 , nbDataDiscardReceived_(0)
121 , nbDqmDiscardReceived_(0)
123 , sumOfSquaresLast_(0)
125 , lock_(toolbox::BSem::FULL)
127 , shmInconsistent_(
false)
134 getApplicationDescriptor()->getContextDescriptor()->getURL()+
"/"+
135 getApplicationDescriptor()->getURN();
136 class_ =getApplicationDescriptor()->getClassName();
137 instance_=getApplicationDescriptor()->getInstance();
142 I2O_FU_TAKE,XDAQ_ORGANIZATION_ID);
148 I2O_EVM_LUMISECTION,XDAQ_ORGANIZATION_ID);
154 vector<toolbox::lang::Method*> methods=
gui_->getMethods();
155 vector<toolbox::lang::Method*>::iterator it;
156 for (it=methods.begin();it!=methods.end();++it) {
157 if ((*it)->type()==
"cgi") {
158 string name=
static_cast<xgi::MethodSignature*
>(*it)->name();
168 toolbox::mem::HeapAllocator *allocator=
new toolbox::mem::HeapAllocator();
169 toolbox::net::URN urn(
"toolbox-mem-pool",i2oPoolName);
170 toolbox::mem::MemoryPoolFactory* poolFactory=
171 toolbox::mem::getMemoryPoolFactory();
172 i2oPool_=poolFactory->createPool(urn,allocator);
175 string s=
"Failed to create pool: "+i2oPoolName;
176 LOG4CPLUS_FATAL(
log_,s);
187 getApplicationDescriptor()->setAttribute(
"icon",
"/evf/images/rbicon.jpg");
208 LOG4CPLUS_INFO(
log_,
"Start configuring ...");
213 std::ostringstream ost;
214 ost <<
"configuring FAILED: Inconsistency in ResourceTable - nbRaw="
219 LOG4CPLUS_INFO(
log_,
"Finished configuring!");
224 std::string
msg =
"configuring FAILED: " + (string)e.what();
237 LOG4CPLUS_INFO(
log_,
"Start enabling ...");
251 LOG4CPLUS_INFO(
log_,
"Finished enabling!");
255 std::string
msg =
"enabling FAILED: "+xcept::stdformat_exception_history(e);
268 LOG4CPLUS_INFO(
log_,
"Start stopping :) ...");
272 gettimeofday(&then,0);
275 gettimeofday(&now,0);
277 std::cout <<
"times: " << now.tv_sec <<
" " << then.tv_sec <<
" "
279 LOG4CPLUS_WARN(
log_,
"Some Process did not detach - going to Emergency stop!");
286 LOG4CPLUS_INFO(
log_,
"Finished stopping!");
291 std::string
msg =
"stopping FAILED: "+xcept::stdformat_exception_history(e);
304 LOG4CPLUS_INFO(
log_,
"Start halting ...");
314 LOG4CPLUS_INFO(
log_,count+1<<
". try to destroy resource table succeeded!");
319 LOG4CPLUS_DEBUG(
log_,count<<
". try to destroy resource table failed ...");
332 LOG4CPLUS_INFO(
log_,
"Finished halting!");
336 std::string
msg =
"halting FAILED: ResourceTable shutdown timed out.";
342 std::string
msg =
"halting FAILED: "+xcept::stdformat_exception_history(e);
355 return fsm_.commandCallback(
msg);
368 LOG4CPLUS_ERROR(
log_,
"TAKE i2o frame received in state "
378 I2O_EVM_END_OF_LUMISECTION_MESSAGE_FRAME *
msg =
379 (I2O_EVM_END_OF_LUMISECTION_MESSAGE_FRAME *)bufRef->getDataLocation();
380 if(msg->lumiSection==0){
381 LOG4CPLUS_ERROR(
log_,
"EOL message received for ls=0!!! ");
414 typedef set<xdaq::ApplicationDescriptor*> AppDescSet_t;
415 typedef AppDescSet_t::iterator AppDescIter_t;
418 AppDescSet_t setOfBUs=
419 getApplicationContext()->getDefaultZone()->
424 for (AppDescIter_t it=setOfBUs.begin();it!=setOfBUs.end();++it)
426 bu_=
new BUProxy(getApplicationDescriptor(),*it,
435 AppDescSet_t setOfSMs=
436 getApplicationContext()->getDefaultZone()->
441 for (AppDescIter_t it=setOfSMs.begin();it!=setOfSMs.end();++it)
443 sm_=
new SMProxy(getApplicationDescriptor(),*it,
454 string name=
in->getenv(
"PATH_INFO");
455 if (name.empty()) name=
"defaultWebPage";
456 static_cast<xgi::MethodSignature*
>(gui_->getMethod(name))->invoke(
in,
out);
469 if (e.type()==
"urn:xdata-event:ItemGroupRetrieveEvent") {
493 else if (e.type()==
"ItemChangedEvent") {
495 string item=
dynamic_cast<xdata::ItemChangedEvent&
>(
e).itemName();
529 struct timezone timezone;
534 toolbox::task::getWorkLoopFactory()->getWorkLoop(
sourceId_+
"Monitoring",
542 string msg =
"Failed to start workloop 'Monitoring'.";
553 unsigned int sumOfSizes;
576 struct timeval monEndTime;
577 struct timezone timezone;
579 gettimeofday(&monEndTime,&timezone);
581 xdata::getInfoSpaceFactory()->lock();
592 sumOfSquaresLast_=sumOfSquares;
595 sumOfSizesLast_=sumOfSizes;
606 double meanOfSquares,
mean,squareOfMean,variance;
611 squareOfMean=mean*
mean;
612 variance=meanOfSquares-squareOfMean;
if(variance<0.0) variance=0.0;
623 xdata::getInfoSpaceFactory()->unlock();
636 toolbox::task::getWorkLoopFactory()->getWorkLoop(
sourceId_+
"Watching",
644 string msg =
"Failed to start workloop 'Watching'.";
664 time_t tcurr=
time(0);
665 for (
UInt_t i=0;
i<evt_tstamps.size();
i++) {
666 pid_t
pid =evt_prcids[
i];
668 time_t tstamp=evt_tstamps[
i];
if (tstamp==0)
continue;
669 double tdiff =difftime(tcurr,tstamp);
672 LOG4CPLUS_ERROR(
log_,
"evt "<<evt<<
" timed out, "<<
"kill prc "<<pid);
677 LOG4CPLUS_INFO(
log_,
"evt "<<evt<<
" under processing for more than "
685 pid_t
pid =prcids[
i];
688 LOG4CPLUS_ERROR(
log_,
"EP prc "<<pid<<
" died, send to error stream if processing.");
695 std::ostringstream ost;
696 ost <<
"Watchdog spotted inconsistency in ResourceTable - nbRaw="
700 sentinelException, ost.str());
701 notifyQualified(
"error",sentinelException);
816 const struct timeval *
end)
821 sec = end->tv_sec - start->tv_sec;
823 if(end->tv_usec > start->tv_usec) {
824 usec = end->tv_usec - start->tv_usec;
828 usec = 1000000 - ((
unsigned int )(start->tv_usec - end->tv_usec));
831 return ((
double)sec) + ((
double)usec) / 1000000.0;
840 using namespace cgicc;
842 std::vector<FormEntry>
els = cgi.getElements() ;
843 for(std::vector<FormEntry>::iterator it =
els.begin(); it !=
els.end(); it++)
844 std::cout <<
"form entry " << (*it).getValue() << std::endl;
846 std::vector<FormEntry> el1;
847 cgi.getElement(
"crcError",el1);
848 *
out<<
"<html>"<<endl;
849 gui_->htmlHead(
in,
out,sourceId_);
850 *
out<<
"<body>"<<endl;
851 gui_->htmlHeadline(
in,
out);
855 if (0!=resourceTable_) {
857 resourceTable_->injectCRCError();
859 *
out <<
"<form method=\"GET\" action=\"customWebPage\" >";
860 *
out <<
"<button name=\"crcError\" type=\"submit\" value=\"injCRC\">Inject CRC</button>" << endl;
861 *
out <<
"</form>" << endl;
862 *
out <<
"<hr/>" << std::endl;
863 vector<pid_t> client_prc_ids = resourceTable_->clientPrcIds();
864 *
out<<
table().set(
"frame",
"void").set(
"rules",
"rows")
865 .set(
"class",
"modules").set(
"width",
"250")<<endl
866 <<tr()<<th(
"Client Processes").set(
"colspan",
"3")<<tr()<<endl
868 <<th(
"client").set(
"align",
"left")
869 <<th(
"process id").set(
"align",
"center")
870 <<th(
"status").set(
"align",
"center")
873 for (
UInt_t i=0;
i<client_prc_ids.size();
i++) {
875 pid_t
pid =client_prc_ids[
i];
878 stringstream ssi; ssi<<
i+1;
879 stringstream sspid; sspid<<
pid;
880 stringstream ssstatus; ssstatus<<
status;
882 string bg_status = (status==0) ?
"#00ff00" :
"ff0000";
884 <<td(ssi.str()).set(
"align",
"left")
885 <<td(sspid.str()).set(
"align",
"center")
886 <<td(ssstatus.str()).set(
"align",
"center").set(
"bgcolor",bg_status)
890 *
out<<
"<br><br>"<<endl;
892 vector<string> states = resourceTable_->cellStates();
893 vector<UInt_t> evt_numbers = resourceTable_->cellEvtNumbers();
894 vector<pid_t> prc_ids = resourceTable_->cellPrcIds();
895 vector<time_t> time_stamps = resourceTable_->cellTimeStamps();
897 *
out<<
table().set(
"frame",
"void").set(
"rules",
"rows")
898 .set(
"class",
"modules").set(
"width",
"500")<<endl
899 <<tr()<<th(
"Shared Memory Cells").set(
"colspan",
"6")<<tr()<<endl
901 <<th(
"cell").set(
"align",
"left")
902 <<th(
"state").set(
"align",
"center")
903 <<th(
"event").set(
"align",
"center")
904 <<th(
"process id").set(
"align",
"center")
905 <<th(
"timestamp").set(
"align",
"center")
906 <<th(
"time").set(
"align",
"center")
912 pid_t
pid = prc_ids[
i];
913 time_t tstamp= time_stamps[
i];
914 double tdiff = difftime(
time(0),tstamp);
916 stringstream ssi; ssi<<
i;
917 stringstream ssevt;
if (evt!=0xffffffff) ssevt<<evt;
else ssevt<<
" - ";
918 stringstream sspid;
if (pid!=0) sspid<<
pid;
else sspid<<
" - ";
919 stringstream sststamp;
if (tstamp!=0) sststamp<<tstamp;
else sststamp<<
" - ";
920 stringstream sstdiff;
if (tstamp!=0) sstdiff<<tdiff;
else sstdiff<<
" - ";
922 string bg_state =
"#ffffff";
923 if (state==
"RAWWRITING"||state==
"RAWWRITTEN"||
924 state==
"RAWREADING"||state==
"RAWREAD")
926 else if (state==
"PROCESSING")
928 else if (state==
"PROCESSED"||state==
"RECOWRITING"||state==
"RECOWRITTEN")
930 else if (state==
"SENDING")
932 else if (state==
"SENT")
934 else if (state==
"DISCARDING")
936 else if (state==
"LUMISECTION")
940 <<td(ssi.str()).set(
"align",
"left")
941 <<td(state).set(
"align",
"center").set(
"bgcolor",bg_state)
942 <<td(ssevt.str()).set(
"align",
"center")
943 <<td(sspid.str()).set(
"align",
"center")
944 <<td(sststamp.str()).set(
"align",
"center")
945 <<td(sstdiff.str()).set(
"align",
"center")
949 *
out<<
"<br><br>"<<endl;
951 vector<string> dqmstates = resourceTable_->dqmCellStates();
953 *
out<<
table().set(
"frame",
"void").set(
"rules",
"rows")
954 .set(
"class",
"modules").set(
"width",
"500")<<endl
955 <<tr()<<th(
"Shared Memory DQM Cells").set(
"colspan",
"6")<<tr()<<endl
957 <<th(
"cell").set(
"align",
"left")
958 <<th(
"state").set(
"align",
"center")
961 for (
UInt_t i=0;
i<dqmstates.size();
i++) {
962 string state=dqmstates[
i];
964 string bg_state =
"#ffffff";
965 if (state==
"WRITING"||state==
"WRITTEN")
967 else if (state==
"SENDING")
969 else if (state==
"SENT")
971 else if (state==
"DISCARDING")
975 <<td(state).set(
"align",
"center").set(
"bgcolor",bg_state)
983 *
out<<
"</body>"<<endl<<
"</html>"<<endl;
990 LOG4CPLUS_WARN(
log_,
"in Emergency stop - handle non-clean stops");
992 for (
UInt_t i=0;
i<client_prc_ids.size();
i++) {
993 pid_t
pid =client_prc_ids[
i];
994 std::cout <<
"B: killing process " <<
i <<
"pid=" << pid << std::endl;
1012 std::cout <<
"delete resourcetable" <<std::endl;
1015 std::cout <<
"cycle through resourcetable config " << std::endl;
1019 std::cout <<
"done with emergency stop" << std::endl;
static const char runNumber_[]
xdata::String clientPrcIds_
bool discardDataEvent(MemRef_t *bufRef)
xdata::UnsignedInteger32 nbLostEvents_
UInt_t nbFreeSlots() const
void addDebugCounter(CString_t &name, Counter_t *counter)
std::vector< UInt_t > cellEvtNumbers() const
xdata::UnsignedInteger32 monSleepSec_
xdata::UnsignedInteger32 nbDiscardedEvents_
xdata::UnsignedInteger32 nbReceivedEvents_
xdata::UnsignedInteger32 nbPendingSMDqmDiscards_
xdata::UnsignedInteger32 nbPendingRequests_
xdata::UnsignedInteger32 nbSentDqmEvents_
xdata::UnsignedInteger32 nbRawCells_
virtual ~FUResourceBroker()
void addStandardParam(CString_t &name, Param_t *param)
bool handleCrashedEP(UInt_t runNumber, pid_t pid)
void I2O_FU_TAKE_Callback(toolbox::mem::Reference *bufRef)
void setDoDumpEvents(UInt_t doDumpEvents)
xdata::UnsignedInteger32 nbTimeoutsWithoutEvent_
void customWebPage(xgi::Input *in, xgi::Output *out)
xdata::UnsignedInteger32 nbSentEvents_
xdata::UnsignedInteger32 nbSentErrorEvents_
uint64_t sumOfSquaresLast_
void configureResources()
xdata::UnsignedInteger32 buInstance_
void startMonitoringWorkLoop()
xdata::UnsignedInteger32 nbDataDiscardReceived_
bool watching(toolbox::task::WorkLoop *wl)
bool buildResource(MemRef_t *bufRef)
void addMonitorParam(CString_t &name, Param_t *param)
void addItemChangedListener(CString_t &name, xdata::ActionListener *l)
xdata::String reasonForFailed_
void I2O_FU_DATA_DISCARD_Callback(toolbox::mem::Reference *bufRef)
#define I2O_FU_DQM_DISCARD
double deltaT(const struct timeval *start, const struct timeval *end)
static void useEvmBoard(bool useEvmBoard)
xdata::Boolean segmentationMode_
void I2O_FU_DQM_DISCARD_Callback(toolbox::mem::Reference *bufRef)
xdata::UnsignedInteger32 nbDqmDiscardReceived_
xdata::UnsignedInteger32 nbDataErrors_
xdata::UnsignedInteger32 doDumpEvents_
xdata::UnsignedInteger32 nbClients_
xdata::Bag< xdaq2rc::ClassnameAndInstance > * rcmsStateListener()
void fireFailed(const std::string &errorMsg, void *originator)
bool halting(toolbox::task::WorkLoop *wl)
xdata::UnsignedInteger32 runNumber_
bool configuring(toolbox::task::WorkLoop *wl)
xdata::UnsignedInteger32 smInstance_
UInt_t nbDiscarded() const
uint64_t sumOfSquares() const
bool isReadyToShutDown() const
toolbox::task::WorkLoop * wlMonitoring_
UInt_t nbPendingSMDqmDiscards() const
bool stopping(toolbox::task::WorkLoop *wl)
xdata::Boolean doFedIdCheck_
UInt_t sumOfSizes() const
std::vector< time_t > cellTimeStamps() const
std::vector< pid_t > clientPrcIds() const
xdata::UnsignedInteger32 nbTakeReceived_
UInt_t nbAllocated() const
xdata::UnsignedInteger32 timeOutSec_
void startSendDqmWorkLoop()
void startSendDataWorkLoop()
xdata::UnsignedInteger32 rawCellSize_
xdata::UnsignedInteger32 nbRecoCells_
UInt_t nbCrcErrors() const
xdata::UnsignedInteger32 nbDqmCells_
UInt_t nbResources() const
void setRunNumber(UInt_t runNumber)
bool enabling(toolbox::task::WorkLoop *wl)
UInt_t nbSentError() const
void fireEvent(const std::string &evtType, void *originator)
#define I2O_FU_DATA_DISCARD
void startWatchingWorkLoop()
xdata::UnsignedInteger32 nbAllocatedEvents_
void actionPerformed(xdata::Event &e)
xdata::UnsignedInteger32 nbTimeoutsWithEvent_
xdata::Boolean processKillerEnabled_
xdata::UnsignedInteger32 nbPendingSMDiscards_
std::string clientPrcIdsAsString() const
FUResourceTable * resourceTable_
xdata::UnsignedInteger32 deltaSumOfSizes_
xdata::Boolean useEvmBoard_
UInt_t nbCompleted() const
xdata::UnsignedInteger32 dqmCellSize_
unsigned long long uint64_t
void printWorkLoopStatus()
xdata::String * stateName()
xdata::InfoSpace * monInfoSpace()
xdata::Double deltaSumOfSquares_
xdata::Boolean * foundRcmsStateListener()
UInt_t nbPendingSMDiscards() const
xoap::MessageReference fsmCallback(xoap::MessageReference msg)
xdata::String smClassName_
UInt_t nbAllocSent() const
std::vector< pid_t > cellPrcIds() const
xdata::UnsignedInteger32 deltaN_
xdata::UnsignedInteger32 recoCellSize_
xdata::String buClassName_
xdata::UnsignedInteger32 watchSleepSec_
xdata::UnsignedInteger32 shmResourceTableTimeout_
static void doFedIdCheck(bool doFedIdCheck)
void I2O_EVM_LUMISECTION_Callback(toolbox::mem::Reference *bufRef)
void startDiscardWorkLoop()
void addMonitorCounter(CString_t &name, Counter_t *counter)
bool monitoring(toolbox::task::WorkLoop *wl)
void postEndOfLumiSection(MemRef_t *bufRef)
toolbox::mem::Pool * i2oPool_
xdata::Double throughput_
toolbox::task::WorkLoop * wlWatching_
xdata::Boolean doDropEvents_
xdata::UnsignedInteger32 doCrcCheck_
xdata::UnsignedInteger32 instance_
toolbox::task::ActionSignature * asMonitoring_
bool discardDqmEvent(MemRef_t *bufRef)
xdata::UnsignedInteger32 nbCrcErrors_
void webPageRequest(xgi::Input *in, xgi::Output *out)
toolbox::task::ActionSignature * asWatching_
xdata::UnsignedInteger32 nbAllocateSent_
struct timeval monStartTime_
void setDoCrcCheck(UInt_t doCrcCheck)
void findRcmsStateListener()
xdata::UnsignedInteger32 dataErrorFlag_