18 #include "xoap/SOAPEnvelope.h"
19 #include "xoap/SOAPBody.h"
20 #include "xoap/domutils.h"
22 #include <netinet/in.h>
35 BU::BU(xdaq::ApplicationStub *
s)
36 : xdaq::Application(s)
37 , log_(getApplicationLogger())
38 , buAppDesc_(getApplicationDescriptor())
40 , buAppContext_(getApplicationContext())
58 , deltaSumOfSquares_(0)
65 , nbEventsRequested_(0)
68 , nbEventsDiscarded_(0)
72 , overwriteEvtId_(
true)
75 , eventBufferSize_(0x400000)
76 , msgBufferSize_(32768)
80 , useFixedFedSize_(
false)
85 , monLastSumOfSquares_(0)
86 , monLastSumOfSizes_(0)
96 getApplicationDescriptor()->getContextDescriptor()->getURL()+
"/"+
97 getApplicationDescriptor()->getURN();
98 class_ =getApplicationDescriptor()->getClassName();
99 instance_=getApplicationDescriptor()->getInstance();
100 hostname_=getApplicationDescriptor()->getContextDescriptor()->getURL();
110 toolbox::mem::HeapAllocator *allocator=
new toolbox::mem::HeapAllocator();
111 toolbox::net::URN urn(
"toolbox-mem-pool",i2oPoolName);
112 toolbox::mem::MemoryPoolFactory* poolFactory=
113 toolbox::mem::getMemoryPoolFactory();
114 i2oPool_=poolFactory->createPool(urn,allocator);
117 string s=
"Failed to create pool: "+i2oPoolName;
118 LOG4CPLUS_FATAL(
log_,s);
128 vector<toolbox::lang::Method*> methods=
gui_->getMethods();
129 vector<toolbox::lang::Method*>::iterator it;
130 for (it=methods.begin();it!=methods.end();++it) {
131 if ((*it)->type()==
"cgi") {
132 string name=
static_cast<xgi::MethodSignature*
>(*it)->name();
152 fedSizeMean_.value_/fedSizeMean_.value_))));
178 LOG4CPLUS_INFO(
log_,
"Start configuring ...");
180 LOG4CPLUS_INFO(
log_,
"Finished configuring!");
184 string msg =
"configuring FAILED: " + (string)e.what();
197 LOG4CPLUS_INFO(
log_,
"Start enabling ...");
210 LOG4CPLUS_INFO(
log_,
"Finished enabling!");
214 string msg =
"enabling FAILED: " + (string)e.what();
226 LOG4CPLUS_INFO(
log_,
"Start stopping :) ...");
235 LOG4CPLUS_INFO(
log_,
"wait to flush ... #builtIds="<<
builtIds_.size());
248 LOG4CPLUS_INFO(
log_,
"wait to flush ...");
258 LOG4CPLUS_INFO(
log_,
"Finished stopping!");
262 string msg =
"stopping FAILED: " + (string)e.what();
273 LOG4CPLUS_INFO(
log_,
"Start halting ...");
283 LOG4CPLUS_INFO(
log_,
"Finished halting!");
287 string msg =
"halting FAILED: " + (string)e.what();
299 return fsm_.commandCallback(
msg);
307 LOG4CPLUS_WARN(
log_,
"Ignore BU_ALLOCATE message while halting.");
312 I2O_MESSAGE_FRAME *stdMsg;
313 I2O_BU_ALLOCATE_MESSAGE_FRAME *
msg;
315 stdMsg=(I2O_MESSAGE_FRAME*)bufRef->getDataLocation();
316 msg =(I2O_BU_ALLOCATE_MESSAGE_FRAME*)stdMsg;
319 I2O_TID fuTid=stdMsg->InitiatorAddress;
320 fuAppDesc_=i2o::utils::getAddressMap()->getApplicationDescriptor(fuTid);
323 for (
unsigned int i=0;
i<msg->n;
i++) {
324 unsigned int fuResourceId=msg->allocate[
i].fuTransactionId;
341 LOG4CPLUS_WARN(
log_,
"Ignore BU_DISCARD message while halting.");
346 I2O_MESSAGE_FRAME *stdMsg=(I2O_MESSAGE_FRAME*)bufRef->getDataLocation();
347 I2O_BU_DISCARD_MESSAGE_FRAME*
msg =(I2O_BU_DISCARD_MESSAGE_FRAME*)stdMsg;
348 unsigned int buResourceId=msg->buResourceId[0];
355 LOG4CPLUS_ERROR(
log_,
"can't discard unknown buResourceId '"<<buResourceId<<
"'");
373 if (e.type()==
"urn:xdata-event:ItemGroupRetrieveEvent") {
378 else if (e.type()==
"ItemChangedEvent") {
379 string item=
dynamic_cast<xdata::ItemChangedEvent&
>(
e).itemName();
390 string name=
in->getenv(
"PATH_INFO");
391 if (name.empty()) name=
"defaultWebPage";
392 static_cast<xgi::MethodSignature*
>(gui_->getMethod(name))->invoke(
in,
out);
400 *
out<<
"<html></html>"<<endl;
408 LOG4CPLUS_INFO(
log_,
"Start 'building' workloop");
410 toolbox::task::getWorkLoopFactory()->getWorkLoop(
sourceId_+
419 string msg =
"Failed to start workloop 'building'.";
433 if (buResourceId>=(uint32_t)
events_.size()) {
434 LOG4CPLUS_INFO(
log_,
"shutdown 'building' workloop.");
450 LOG4CPLUS_INFO(
log_,
"building:received null post");
452 unsigned int saveBUResourceId = buResourceId;
468 LOG4CPLUS_INFO(
log_,
"Start 'sending' workloop");
478 string msg =
"Failed to start workloop 'sending'.";
492 if (buResourceId>=(uint32_t)
events_.size()) {
493 LOG4CPLUS_INFO(
log_,
"shutdown 'sending' workloop.");
525 struct timezone timezone;
529 LOG4CPLUS_INFO(
log_,
"Start 'monitoring' workloop");
531 toolbox::task::getWorkLoopFactory()->getWorkLoop(
sourceId_+
539 string msg =
"Failed to start workloop 'monitoring'.";
548 struct timeval monEndTime;
549 struct timezone timezone;
551 gettimeofday(&monEndTime,&timezone);
570 monLastSumOfSquares_=monSumOfSquares;
573 monLastSumOfSizes_=monSumOfSizes;
584 double meanOfSquares,
mean,squareOfMean,variance;
589 squareOfMean=mean*
mean;
590 variance=meanOfSquares-squareOfMean;
616 LOG4CPLUS_ERROR(
log_,
"No GUI, can't export parameters");
695 sem_init(&
lock_,0,1);
713 sec = end->tv_sec - start->tv_sec;
715 if(end->tv_usec > start->tv_usec) {
716 usec = end->tv_usec - start->tv_usec;
720 usec = 1000000 - ((
unsigned int )(start->tv_usec - end->tv_usec));
723 return ((
double)sec) + ((
double)usec) / 1000000.0;
743 if(
event == 0)
return false;
748 unsigned int fedSize=
event->FEDData(fedId).size();
749 unsigned char* fedAddr=
event->FEDData(fedId).data();
752 fedHeader->
eventid=(fedHeader->
eventid&0xFF000000)+(evtNumber&0x00FFFFFF);
754 if (fedSize>0) evt->
writeFed(fedId,fedAddr,fedSize);
768 fedSize=(
unsigned int)(
std::exp(logFedSize));
769 if (fedSize<fedSizeMin) fedSize=fedSizeMin;
786 unsigned int fuResourceId)
788 unsigned int msgHeaderSize =
sizeof(I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME);
791 if((msgPayloadSize%4)!=0) LOG4CPLUS_ERROR(
log_,
"Invalid Payload Size.");
793 toolbox::mem::Reference *head =0;
794 toolbox::mem::Reference *tail =0;
795 toolbox::mem::Reference *bufRef=0;
797 I2O_MESSAGE_FRAME *stdMsg=0;
798 I2O_PRIVATE_MESSAGE_FRAME *pvtMsg=0;
799 I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME *
block =0;
801 unsigned int iFed =0;
802 unsigned int nSuperFrag =64;
803 unsigned int nFedPerSuperFrag=
validFedIds_.size()/nSuperFrag;
804 unsigned int nBigSuperFrags =
validFedIds_.size()%nSuperFrag;
806 if (evt->
nFed()<nSuperFrag) {
807 nSuperFrag=evt->
nFed();
813 nFedPerSuperFrag=evt->
nFed()/nSuperFrag;
814 nBigSuperFrags =evt->
nFed()%nSuperFrag;
817 for (
unsigned int iSuperFrag=0;iSuperFrag<nSuperFrag;iSuperFrag++) {
820 unsigned int nFed=iFed+nFedPerSuperFrag;
821 if (iSuperFrag<nBigSuperFrags) ++nFed;
824 unsigned int nBlock =0;
826 unsigned int totSize =curbSize;
827 for (
unsigned int i=iFed;
i<nFed;
i++) {
830 if (curbSize>msgPayloadSize) {
832 if(curbSize%msgPayloadSize)totSize+=
frlHeaderSize_*(curbSize/msgPayloadSize);
834 curbSize=curbSize%msgPayloadSize;
837 nBlock=totSize/msgPayloadSize+(totSize%msgPayloadSize>0 ? 1 : 0);
841 unsigned int remainder =0;
842 bool fedTrailerLeft=
false;
845 unsigned char *startOfPayload=0;
848 for(
unsigned int iBlock=0;iBlock<nBlock;iBlock++) {
851 payload=msgPayloadSize;
855 bufRef=toolbox::mem::getMemoryPoolFactory()->getFrame(
i2oPool_,
859 LOG4CPLUS_FATAL(
log_,
"xdaq::frameAlloc failed");
863 stdMsg=(I2O_MESSAGE_FRAME*)bufRef->getDataLocation();
864 pvtMsg=(I2O_PRIVATE_MESSAGE_FRAME*)stdMsg;
865 block =(I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME*)stdMsg;
867 pvtMsg->XFunctionCode =I2O_FU_TAKE;
868 pvtMsg->OrganizationID =XDAQ_ORGANIZATION_ID;
870 stdMsg->MessageSize =(msgHeaderSize + payload) >> 2;
871 stdMsg->Function =I2O_PRIVATE_MESSAGE;
872 stdMsg->VersionOffset =0;
874 stdMsg->InitiatorAddress=i2o::utils::getAddressMap()->getTid(
buAppDesc_);
875 stdMsg->TargetAddress =i2o::utils::getAddressMap()->getTid(
fuAppDesc_);
878 block->fuTransactionId =fuResourceId;
879 block->blockNb =iBlock;
880 block->nbBlocksInSuperFragment=nBlock;
881 block->superFragmentNb =iSuperFrag;
882 block->nbSuperFragmentsInEvent=nSuperFrag;
886 startOfPayload =(
unsigned char*)block+msgHeaderSize;
887 frlh_t* frlHeader=(frlh_t*)startOfPayload;
889 frlHeader->segno =iBlock;
893 frlHeader->segsize =payload;
894 unsigned int leftspace=payload;
898 memcpy(startOfFedBlocks,
905 fedTrailerLeft =
false;
908 if((iFed==nFed-1) && !
last) {
909 frlHeader->segsize-=leftspace;
910 int msgSize=stdMsg->MessageSize << 2;
912 bufRef->setDataSize(msgSize);
913 stdMsg->MessageSize = msgSize >> 2;
914 frlHeader->segsize=frlHeader->segsize | FRL_LAST_SEGM;
928 if(payload>=remainder) {
929 memcpy(startOfFedBlocks,
933 startOfFedBlocks+=remainder;
934 leftspace -=remainder;
938 frlHeader->segsize-=leftspace;
939 int msgSize=stdMsg->MessageSize << 2;
941 bufRef->setDataSize(msgSize);
942 stdMsg->MessageSize = msgSize >> 2;
943 frlHeader->segsize=frlHeader->segsize | FRL_LAST_SEGM;
955 memcpy(startOfFedBlocks,
960 fedTrailerLeft =
true;
966 memcpy(startOfFedBlocks,
983 frlHeader->segsize-=leftspace;
994 memcpy(startOfFedBlocks,
1003 memcpy(startOfFedBlocks,
1008 frlHeader->segsize-=leftspace;
1009 fedTrailerLeft =
true;
1029 if (iFed==nFed && remainder==0 && !last) {
1030 frlHeader->segsize-=leftspace;
1031 int msgSize=stdMsg->MessageSize << 2;
1032 msgSize -=leftspace;
1033 bufRef->setDataSize(msgSize);
1034 stdMsg->MessageSize=msgSize >> 2;
1035 frlHeader->segsize =frlHeader->segsize | FRL_LAST_SEGM;
1041 if(iSuperFrag==0&&iBlock==0) {
1046 tail->setNextReference(bufRef);
1050 if((iBlock==nBlock-1) && remainder!=0) {
1059 toolbox::mem::Reference* next=head;
1061 block =(I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME*)next->getDataLocation();
1062 if (block->superFragmentNb==iSuperFrag)
1063 block->nbBlocksInSuperFragment=nBlock;
1064 }
while((next=next->getNextReference()));
1080 printf(
"Byte 0 1 2 3 4 5 6 7\n");
1085 for (
unsigned int i=0;
i<(len/8);
i++) {
1088 for (pos=0;pos<12;pos+=3) {
1089 sprintf(&left1[pos],
"%2.2x ",
1090 ((
unsigned char*)data)[c+off]);
1091 sprintf(&right1[rpos],
"%1c",
1092 ((data[c+off] > 32)&&(data[c+off] < 127)) ? data[c+off] :
'.');
1093 sprintf (&left2[pos],
"%2.2x ",
1094 ((
unsigned char*)data)[c+off+4]);
1095 sprintf (&right2[rpos],
"%1c",
1096 ((data[c+off+4] > 32)&&(data[c+off+4]<127)) ? data[c+off+4] :
'.');
1102 printf (
"%4d: %s%s || %s%s %p\n",
1103 c-8, left1, left2, right1, right2, &data[c-8]);
1114 XDAQ_INSTANTIATOR_IMPL(
BU)
static const char runNumber_[]
void setLargeAppIcon(CString_t &icon)
xdata::UnsignedInteger32 nbEventsInBU_
void initialize(unsigned int evtNumber)
bool configuring(toolbox::task::WorkLoop *wl)
void webPageRequest(xgi::Input *in, xgi::Output *out)
xdata::Boolean overwriteEvtId_
toolbox::task::ActionSignature * asMonitoring_
xdata::Double memUsedInMB_
unsigned int evtNumber() const
static void setComputeCrc(bool computeCrc)
std::queue< unsigned int > builtIds_
void addStandardParam(CString_t &name, Param_t *param)
bool sending(toolbox::task::WorkLoop *wl)
bool writeFed(unsigned int id, unsigned char *data, unsigned int size)
xdata::UnsignedInteger32 msgBufferSize_
xdaq::ApplicationContext * buAppContext_
toolbox::mem::Reference * createMsgChain(evf::BUEvent *evt, unsigned int fuResourceId)
unsigned int buResourceId() const
virtual FEDRawDataCollection * getFEDRawData()
xdata::Boolean useFixedFedSize_
double deltaT(const struct timeval *start, const struct timeval *end)
xdata::UnsignedInteger32 fedSizeWidth_
static const int frlHeaderSize_
std::queue< unsigned int > rqstIds_
Exp< T >::type exp(const T &t)
void setSmallAppIcon(CString_t &icon)
void addMonitorParam(CString_t &name, Param_t *param)
void addItemChangedListener(CString_t &name, xdata::ActionListener *l)
bool monitoring(toolbox::task::WorkLoop *wl)
xdata::UnsignedInteger32 instance_
bool writeFedHeader(unsigned int i)
xdata::Double throughput_
void startMonitoringWorkLoop()
static const int fedTrailerSize_
void customWebPage(xgi::Input *in, xgi::Output *out)
bool stopping(toolbox::task::WorkLoop *wl)
void actionPerformed(xdata::Event &e)
unsigned char * fedAddr(unsigned int i) const
xdata::Double deltaSumOfSquares_
void startBuildingWorkLoop()
xdata::Bag< xdaq2rc::ClassnameAndInstance > * rcmsStateListener()
void fireFailed(const std::string &errorMsg, void *originator)
xoap::MessageReference fsmCallback(xoap::MessageReference msg)
static PlaybackRawDataProvider * instance()
toolbox::task::ActionSignature * asBuilding_
toolbox::task::WorkLoop * wlSending_
xdata::UnsignedInteger32 nbEventsSent_
void I2O_BU_ALLOCATE_Callback(toolbox::mem::Reference *bufRef)
xdata::UnsignedInteger32 nbEventsRequested_
unsigned int evtSize() const
toolbox::task::WorkLoop * wlBuilding_
xdaq::ApplicationDescriptor * buAppDesc_
xdata::UnsignedInteger32 monSleepSec_
void dumpFrame(unsigned char *data, unsigned int len)
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
block
Formating index page's pieces.
toolbox::task::ActionSignature * asSending_
bool writeFedTrailer(unsigned int i)
xdata::UnsignedInteger32 deltaN_
void fireEvent(const std::string &evtType, void *originator)
std::vector< unsigned int > validFedIds_
toolbox::task::WorkLoop * wlMonitoring_
Log< T >::type log(const T &t)
unsigned long long uint64_t
xdata::UnsignedInteger32 fedSizeMax_
xdata::String * stateName()
bool generateEvent(evf::BUEvent *evt)
bool halting(toolbox::task::WorkLoop *wl)
xdata::InfoSpace * monInfoSpace()
xdata::Boolean * foundRcmsStateListener()
toolbox::mem::Pool * i2oPool_
xdata::UnsignedInteger32 deltaSumOfSizes_
bool building(toolbox::task::WorkLoop *wl)
unsigned int fedSize(unsigned int i) const
std::vector< evf::BUEvent * > events_
xdata::UnsignedInteger32 runNumber_
xdata::UnsignedInteger32 eventBufferSize_
xdata::UnsignedInteger32 fedSizeMean_
void I2O_BU_DISCARD_Callback(toolbox::mem::Reference *bufRef)
std::queue< unsigned int > freeIds_
xdaq::ApplicationDescriptor * fuAppDesc_
std::set< unsigned int > sentIds_
void startSendingWorkLoop()
unsigned int monLastSumOfSizes_
uint64_t monLastSumOfSquares_
xdata::UnsignedInteger32 nbEventsDiscarded_
void addMonitorCounter(CString_t &name, Counter_t *counter)
static const int fedHeaderSize_
xdata::UnsignedInteger32 queueSize_
unsigned int nFed() const
bool enabling(toolbox::task::WorkLoop *wl)
xdata::UnsignedInteger32 nbEventsBuilt_
struct timeval monStartTime_
static bool inRangeNoGT(int)
void findRcmsStateListener()
xdata::UnsignedInteger32 firstEvent_