20 #include "i2o/Method.h"
21 #include "interface/shared/i2oXFunctionCodes.h"
22 #include "interface/evb/i2oEVBMsgs.h"
23 #include "xcept/tools.h"
25 #include "toolbox/mem/HeapAllocator.h"
26 #include "toolbox/mem/Reference.h"
27 #include "toolbox/mem/MemoryPoolFactory.h"
28 #include "toolbox/mem/exception/Exception.h"
30 #include "xoap/MessageReference.h"
31 #include "xoap/MessageFactory.h"
32 #include "xoap/SOAPEnvelope.h"
33 #include "xoap/SOAPBody.h"
34 #include "xoap/domutils.h"
35 #include "xoap/Method.h"
37 #include "cgicc/CgiDefs.h"
38 #include "cgicc/Cgicc.h"
39 #include "cgicc/FormEntry.h"
40 #include "cgicc/HTMLClasses.h"
56 FUResourceBroker::FUResourceBroker(xdaq::ApplicationStub *
s)
57 : xdaq::Application(s)
60 , log_(getApplicationLogger())
73 , deltaSumOfSquares_(0)
79 , nbAllocatedEvents_(0)
80 , nbPendingRequests_(0)
81 , nbReceivedEvents_(0)
84 , nbSentErrorEvents_(0)
85 , nbPendingSMDiscards_(0)
86 , nbPendingSMDqmDiscards_(0)
87 , nbDiscardedEvents_(0)
89 , highestEolReceived_(0)
95 , nbTimeoutsWithEvent_(0)
96 , nbTimeoutsWithoutEvent_(0)
98 , segmentationMode_(
false)
104 , rawCellSize_(0x400000)
105 , recoCellSize_(0x800000)
106 , dqmCellSize_(0x800000)
107 , doDropEvents_(
false)
108 , doFedIdCheck_(
true)
113 , smClassName_(
"StorageManager")
115 , shmResourceTableTimeout_(200000)
119 , processKillerEnabled_(
true)
121 , reasonForFailed_(
"")
124 , nbDataDiscardReceived_(0)
125 , nbDqmDiscardReceived_(0)
127 , sumOfSquaresLast_(0)
129 , lock_(toolbox::BSem::FULL)
131 , shmInconsistent_(
false)
138 getApplicationDescriptor()->getContextDescriptor()->getURL()+
"/"+
139 getApplicationDescriptor()->getURN();
140 class_ =getApplicationDescriptor()->getClassName();
141 instance_=getApplicationDescriptor()->getInstance();
146 I2O_FU_TAKE,XDAQ_ORGANIZATION_ID);
152 I2O_EVM_LUMISECTION,XDAQ_ORGANIZATION_ID);
158 vector<toolbox::lang::Method*> methods=
gui_->getMethods();
159 vector<toolbox::lang::Method*>::iterator it;
160 for (it=methods.begin();it!=methods.end();++it) {
161 if ((*it)->type()==
"cgi") {
162 string name=
static_cast<xgi::MethodSignature*
>(*it)->name();
172 toolbox::mem::HeapAllocator *allocator=
new toolbox::mem::HeapAllocator();
173 toolbox::net::URN urn(
"toolbox-mem-pool",i2oPoolName);
174 toolbox::mem::MemoryPoolFactory* poolFactory=
175 toolbox::mem::getMemoryPoolFactory();
176 i2oPool_=poolFactory->createPool(urn,allocator);
179 string s=
"Failed to create pool: "+i2oPoolName;
180 LOG4CPLUS_FATAL(
log_,s);
191 getApplicationDescriptor()->setAttribute(
"icon",
"/evf/images/rbicon.jpg");
212 LOG4CPLUS_INFO(
log_,
"Start configuring ...");
217 std::ostringstream ost;
218 ost <<
"configuring FAILED: Inconsistency in ResourceTable - nbRaw="
223 LOG4CPLUS_INFO(
log_,
"Finished configuring!");
228 std::string
msg =
"configuring FAILED: " + (string)e.what();
241 LOG4CPLUS_INFO(
log_,
"Start enabling ...");
258 LOG4CPLUS_INFO(
log_,
"Finished enabling!");
262 std::string
msg =
"enabling FAILED: "+xcept::stdformat_exception_history(e);
275 LOG4CPLUS_INFO(
log_,
"Start stopping :) ...");
279 gettimeofday(&then,0);
282 gettimeofday(&now,0);
284 std::cout <<
"times: " << now.tv_sec <<
" " << then.tv_sec <<
" "
286 LOG4CPLUS_WARN(
log_,
"Some Process did not detach - going to Emergency stop!");
293 LOG4CPLUS_INFO(
log_,
"Finished stopping!");
298 std::string
msg =
"stopping FAILED: "+xcept::stdformat_exception_history(e);
311 LOG4CPLUS_INFO(
log_,
"Start halting ...");
321 LOG4CPLUS_INFO(
log_,count+1<<
". try to destroy resource table succeeded!");
326 LOG4CPLUS_DEBUG(
log_,count<<
". try to destroy resource table failed ...");
339 LOG4CPLUS_INFO(
log_,
"Finished halting!");
343 std::string
msg =
"halting FAILED: ResourceTable shutdown timed out.";
349 std::string
msg =
"halting FAILED: "+xcept::stdformat_exception_history(e);
362 return fsm_.commandCallback(
msg);
375 LOG4CPLUS_ERROR(
log_,
"TAKE i2o frame received in state "
385 I2O_EVM_END_OF_LUMISECTION_MESSAGE_FRAME *
msg =
386 (I2O_EVM_END_OF_LUMISECTION_MESSAGE_FRAME *)bufRef->getDataLocation();
387 if(msg->lumiSection==0){
388 LOG4CPLUS_ERROR(
log_,
"EOL message received for ls=0!!! ");
394 LOG4CPLUS_ERROR(
log_,
"EOL message not in sequence, expected "
396 <<
" received " << msg->lumiSection);
400 LOG4CPLUS_WARN(
log_,
"EOL message not in sequence, expected "
402 <<
" received " << msg->lumiSection);
436 typedef set<xdaq::ApplicationDescriptor*> AppDescSet_t;
437 typedef AppDescSet_t::iterator AppDescIter_t;
440 AppDescSet_t setOfBUs=
441 getApplicationContext()->getDefaultZone()->
446 for (AppDescIter_t it=setOfBUs.begin();it!=setOfBUs.end();++it)
448 bu_=
new BUProxy(getApplicationDescriptor(),*it,
457 AppDescSet_t setOfSMs=
458 getApplicationContext()->getDefaultZone()->
463 for (AppDescIter_t it=setOfSMs.begin();it!=setOfSMs.end();++it)
465 sm_=
new SMProxy(getApplicationDescriptor(),*it,
476 string name=
in->getenv(
"PATH_INFO");
477 if (name.empty()) name=
"defaultWebPage";
478 static_cast<xgi::MethodSignature*
>(gui_->getMethod(name))->invoke(
in,
out);
491 if (e.type()==
"urn:xdata-event:ItemGroupRetrieveEvent") {
517 else if (e.type()==
"ItemChangedEvent") {
519 string item=
dynamic_cast<xdata::ItemChangedEvent&
>(e).itemName();
553 struct timezone timezone;
558 toolbox::task::getWorkLoopFactory()->getWorkLoop(
sourceId_+
"Monitoring",
566 string msg =
"Failed to start workloop 'Monitoring'.";
577 unsigned int sumOfSizes;
600 struct timeval monEndTime;
601 struct timezone timezone;
603 gettimeofday(&monEndTime,&timezone);
605 xdata::getInfoSpaceFactory()->lock();
616 sumOfSquaresLast_=sumOfSquares;
619 sumOfSizesLast_=sumOfSizes;
630 double meanOfSquares,
mean,squareOfMean,variance;
635 squareOfMean=mean*
mean;
636 variance=meanOfSquares-squareOfMean;
if(variance<0.0) variance=0.0;
647 xdata::getInfoSpaceFactory()->unlock();
660 toolbox::task::getWorkLoopFactory()->getWorkLoop(
sourceId_+
"Watching",
668 string msg =
"Failed to start workloop 'Watching'.";
688 time_t tcurr=
time(0);
689 for (
UInt_t i=0;
i<evt_tstamps.size();
i++) {
690 pid_t
pid =evt_prcids[
i];
692 time_t tstamp=evt_tstamps[
i];
if (tstamp==0)
continue;
693 double tdiff =difftime(tcurr,tstamp);
696 LOG4CPLUS_ERROR(
log_,
"evt "<<evt<<
" timed out, "<<
"kill prc "<<pid);
701 LOG4CPLUS_INFO(
log_,
"evt "<<evt<<
" under processing for more than "
709 pid_t
pid =prcids[
i];
712 LOG4CPLUS_ERROR(
log_,
"EP prc "<<pid<<
" died, send to error stream if processing.");
719 std::ostringstream ost;
720 ost <<
"Watchdog spotted inconsistency in ResourceTable - nbRaw="
724 sentinelException, ost.str());
725 notifyQualified(
"error",sentinelException);
845 const struct timeval *
end)
850 sec = end->tv_sec - start->tv_sec;
852 if(end->tv_usec > start->tv_usec) {
853 usec = end->tv_usec - start->tv_usec;
857 usec = 1000000 - ((
unsigned int )(start->tv_usec - end->tv_usec));
860 return ((
double)sec) + ((
double)usec) / 1000000.0;
869 using namespace cgicc;
871 std::vector<FormEntry>
els = cgi.getElements() ;
872 for(std::vector<FormEntry>::iterator it =
els.begin(); it !=
els.end(); it++)
873 std::cout <<
"form entry " << (*it).getValue() << std::endl;
875 std::vector<FormEntry> el1;
876 cgi.getElement(
"crcError",el1);
877 *
out<<
"<html>"<<endl;
878 gui_->htmlHead(
in,
out,sourceId_);
879 *
out<<
"<body>"<<endl;
880 gui_->htmlHeadline(
in,
out);
884 if (0!=resourceTable_) {
886 resourceTable_->injectCRCError();
888 *
out <<
"<form method=\"GET\" action=\"customWebPage\" >";
889 *
out <<
"<button name=\"crcError\" type=\"submit\" value=\"injCRC\">Inject CRC</button>" << endl;
890 *
out <<
"</form>" << endl;
891 *
out <<
"<hr/>" << std::endl;
892 vector<pid_t> client_prc_ids = resourceTable_->clientPrcIds();
893 *
out<<
table().set(
"frame",
"void").set(
"rules",
"rows")
894 .set(
"class",
"modules").set(
"width",
"250")<<endl
895 <<tr()<<th(
"Client Processes").set(
"colspan",
"3")<<tr()<<endl
897 <<th(
"client").set(
"align",
"left")
898 <<th(
"process id").set(
"align",
"center")
899 <<th(
"status").set(
"align",
"center")
902 for (
UInt_t i=0;
i<client_prc_ids.size();
i++) {
904 pid_t
pid =client_prc_ids[
i];
907 stringstream ssi; ssi<<
i+1;
908 stringstream sspid; sspid<<
pid;
909 stringstream ssstatus; ssstatus<<
status;
911 string bg_status = (status==0) ?
"#00ff00" :
"ff0000";
913 <<td(ssi.str()).set(
"align",
"left")
914 <<td(sspid.str()).set(
"align",
"center")
915 <<td(ssstatus.str()).set(
"align",
"center").set(
"bgcolor",bg_status)
919 *
out<<
"<br><br>"<<endl;
921 vector<string> states = resourceTable_->cellStates();
922 vector<UInt_t> evt_numbers = resourceTable_->cellEvtNumbers();
923 vector<pid_t> prc_ids = resourceTable_->cellPrcIds();
924 vector<time_t> time_stamps = resourceTable_->cellTimeStamps();
926 *
out<<
table().set(
"frame",
"void").set(
"rules",
"rows")
927 .set(
"class",
"modules").set(
"width",
"500")<<endl
928 <<tr()<<th(
"Shared Memory Cells").set(
"colspan",
"6")<<tr()<<endl
930 <<th(
"cell").set(
"align",
"left")
931 <<th(
"state").set(
"align",
"center")
932 <<th(
"event").set(
"align",
"center")
933 <<th(
"process id").set(
"align",
"center")
934 <<th(
"timestamp").set(
"align",
"center")
935 <<th(
"time").set(
"align",
"center")
941 pid_t
pid = prc_ids[
i];
942 time_t tstamp= time_stamps[
i];
943 double tdiff = difftime(
time(0),tstamp);
945 stringstream ssi; ssi<<
i;
946 stringstream ssevt;
if (evt!=0xffffffff) ssevt<<evt;
else ssevt<<
" - ";
947 stringstream sspid;
if (pid!=0) sspid<<
pid;
else sspid<<
" - ";
948 stringstream sststamp;
if (tstamp!=0) sststamp<<tstamp;
else sststamp<<
" - ";
949 stringstream sstdiff;
if (tstamp!=0) sstdiff<<tdiff;
else sstdiff<<
" - ";
951 string bg_state =
"#ffffff";
952 if (state==
"RAWWRITING"||state==
"RAWWRITTEN"||
953 state==
"RAWREADING"||state==
"RAWREAD")
955 else if (state==
"PROCESSING")
957 else if (state==
"PROCESSED"||state==
"RECOWRITING"||state==
"RECOWRITTEN")
959 else if (state==
"SENDING")
961 else if (state==
"SENT")
963 else if (state==
"DISCARDING")
965 else if (state==
"LUMISECTION")
969 <<td(ssi.str()).set(
"align",
"left")
970 <<td(state).set(
"align",
"center").set(
"bgcolor",bg_state)
971 <<td(ssevt.str()).set(
"align",
"center")
972 <<td(sspid.str()).set(
"align",
"center")
973 <<td(sststamp.str()).set(
"align",
"center")
974 <<td(sstdiff.str()).set(
"align",
"center")
978 *
out<<
"<br><br>"<<endl;
980 vector<string> dqmstates = resourceTable_->dqmCellStates();
982 *
out<<
table().set(
"frame",
"void").set(
"rules",
"rows")
983 .set(
"class",
"modules").set(
"width",
"500")<<endl
984 <<tr()<<th(
"Shared Memory DQM Cells").set(
"colspan",
"6")<<tr()<<endl
986 <<th(
"cell").set(
"align",
"left")
987 <<th(
"state").set(
"align",
"center")
990 for (
UInt_t i=0;
i<dqmstates.size();
i++) {
991 string state=dqmstates[
i];
993 string bg_state =
"#ffffff";
994 if (state==
"WRITING"||state==
"WRITTEN")
996 else if (state==
"SENDING")
998 else if (state==
"SENT")
1000 else if (state==
"DISCARDING")
1003 *
out<<tr()<<
"<td>"<<
i<<
"</td>"
1004 <<td(state).set(
"align",
"center").set(
"bgcolor",bg_state)
1012 *
out<<
"</body>"<<endl<<
"</html>"<<endl;
1019 LOG4CPLUS_WARN(
log_,
"in Emergency stop - handle non-clean stops");
1021 for (
UInt_t i=0;
i<client_prc_ids.size();
i++) {
1022 pid_t
pid =client_prc_ids[
i];
1023 std::cout <<
"B: killing process " <<
i <<
"pid=" << pid << std::endl;
1041 std::cout <<
"delete resourcetable" <<std::endl;
1044 std::cout <<
"cycle through resourcetable config " << std::endl;
1048 std::cout <<
"done with emergency stop" << std::endl;
static const char runNumber_[]
xdata::String clientPrcIds_
bool discardDataEvent(MemRef_t *bufRef)
xdata::UnsignedInteger32 nbLostEvents_
UInt_t nbFreeSlots() const
void addDebugCounter(CString_t &name, Counter_t *counter)
std::vector< UInt_t > cellEvtNumbers() const
xdata::UnsignedInteger32 monSleepSec_
xdata::UnsignedInteger32 nbEolPosted_
xdata::UnsignedInteger32 nbDiscardedEvents_
xdata::UnsignedInteger32 nbReceivedEvents_
xdata::UnsignedInteger32 nbPendingSMDqmDiscards_
xdata::UnsignedInteger32 nbPendingRequests_
xdata::UnsignedInteger32 nbSentDqmEvents_
xdata::UnsignedInteger32 nbRawCells_
virtual ~FUResourceBroker()
void addStandardParam(CString_t &name, Param_t *param)
bool handleCrashedEP(UInt_t runNumber, pid_t pid)
void I2O_FU_TAKE_Callback(toolbox::mem::Reference *bufRef)
void setDoDumpEvents(UInt_t doDumpEvents)
xdata::UnsignedInteger32 nbTimeoutsWithoutEvent_
void customWebPage(xgi::Input *in, xgi::Output *out)
xdata::UnsignedInteger32 nbSentEvents_
xdata::UnsignedInteger32 nbSentErrorEvents_
uint64_t sumOfSquaresLast_
void configureResources()
xdata::UnsignedInteger32 buInstance_
void startMonitoringWorkLoop()
xdata::UnsignedInteger32 nbDataDiscardReceived_
bool watching(toolbox::task::WorkLoop *wl)
bool buildResource(MemRef_t *bufRef)
void addMonitorParam(CString_t &name, Param_t *param)
void addItemChangedListener(CString_t &name, xdata::ActionListener *l)
xdata::String reasonForFailed_
void I2O_FU_DATA_DISCARD_Callback(toolbox::mem::Reference *bufRef)
#define I2O_FU_DQM_DISCARD
double deltaT(const struct timeval *start, const struct timeval *end)
static void useEvmBoard(bool useEvmBoard)
xdata::Boolean segmentationMode_
void I2O_FU_DQM_DISCARD_Callback(toolbox::mem::Reference *bufRef)
xdata::UnsignedInteger32 highestEolReceived_
xdata::UnsignedInteger32 nbDqmDiscardReceived_
xdata::UnsignedInteger32 nbDataErrors_
xdata::UnsignedInteger32 doDumpEvents_
UInt_t nbEolDiscarded() const
xdata::UnsignedInteger32 nbClients_
xdata::Bag< xdaq2rc::ClassnameAndInstance > * rcmsStateListener()
void fireFailed(const std::string &errorMsg, void *originator)
bool halting(toolbox::task::WorkLoop *wl)
xdata::UnsignedInteger32 runNumber_
bool configuring(toolbox::task::WorkLoop *wl)
xdata::UnsignedInteger32 smInstance_
UInt_t nbDiscarded() const
uint64_t sumOfSquares() const
bool isReadyToShutDown() const
toolbox::task::WorkLoop * wlMonitoring_
UInt_t nbPendingSMDqmDiscards() const
bool stopping(toolbox::task::WorkLoop *wl)
xdata::Boolean doFedIdCheck_
UInt_t sumOfSizes() const
std::vector< time_t > cellTimeStamps() const
std::vector< pid_t > clientPrcIds() const
xdata::UnsignedInteger32 nbTakeReceived_
UInt_t nbAllocated() const
xdata::UnsignedInteger32 timeOutSec_
void startSendDqmWorkLoop()
void startSendDataWorkLoop()
xdata::UnsignedInteger32 rawCellSize_
xdata::UnsignedInteger32 nbRecoCells_
UInt_t nbCrcErrors() const
xdata::UnsignedInteger32 nbDqmCells_
UInt_t nbResources() const
void setRunNumber(UInt_t runNumber)
bool enabling(toolbox::task::WorkLoop *wl)
xdata::UnsignedInteger32 nbEolDiscarded_
UInt_t nbSentError() const
void fireEvent(const std::string &evtType, void *originator)
#define I2O_FU_DATA_DISCARD
void startWatchingWorkLoop()
xdata::UnsignedInteger32 nbAllocatedEvents_
void actionPerformed(xdata::Event &e)
xdata::UnsignedInteger32 nbTimeoutsWithEvent_
xdata::Boolean processKillerEnabled_
xdata::UnsignedInteger32 nbPendingSMDiscards_
std::string clientPrcIdsAsString() const
FUResourceTable * resourceTable_
xdata::UnsignedInteger32 deltaSumOfSizes_
xdata::Boolean useEvmBoard_
UInt_t nbCompleted() const
xdata::UnsignedInteger32 dqmCellSize_
unsigned long long uint64_t
void printWorkLoopStatus()
xdata::String * stateName()
xdata::InfoSpace * monInfoSpace()
xdata::Double deltaSumOfSquares_
UInt_t nbEolPosted() const
xdata::Boolean * foundRcmsStateListener()
UInt_t nbPendingSMDiscards() const
xoap::MessageReference fsmCallback(xoap::MessageReference msg)
xdata::String smClassName_
UInt_t nbAllocSent() const
std::vector< pid_t > cellPrcIds() const
xdata::UnsignedInteger32 deltaN_
xdata::UnsignedInteger32 recoCellSize_
xdata::String buClassName_
xdata::UnsignedInteger32 watchSleepSec_
xdata::UnsignedInteger32 shmResourceTableTimeout_
static void doFedIdCheck(bool doFedIdCheck)
void I2O_EVM_LUMISECTION_Callback(toolbox::mem::Reference *bufRef)
void startDiscardWorkLoop()
void addMonitorCounter(CString_t &name, Counter_t *counter)
bool monitoring(toolbox::task::WorkLoop *wl)
void postEndOfLumiSection(MemRef_t *bufRef)
toolbox::mem::Pool * i2oPool_
xdata::UnsignedInteger32 nbReceivedEol_
xdata::Double throughput_
toolbox::task::WorkLoop * wlWatching_
xdata::Boolean doDropEvents_
xdata::UnsignedInteger32 doCrcCheck_
xdata::UnsignedInteger32 instance_
toolbox::task::ActionSignature * asMonitoring_
bool discardDqmEvent(MemRef_t *bufRef)
xdata::UnsignedInteger32 nbCrcErrors_
void webPageRequest(xgi::Input *in, xgi::Output *out)
toolbox::task::ActionSignature * asWatching_
xdata::UnsignedInteger32 nbAllocateSent_
struct timeval monStartTime_
void setDoCrcCheck(UInt_t doCrcCheck)
void findRcmsStateListener()
xdata::UnsignedInteger32 dataErrorFlag_