17 #include "EventFilter/FEDInterface/interface/GlobalEventNumber.icc"
19 #include "xoap/SOAPEnvelope.h"
20 #include "xoap/SOAPBody.h"
21 #include "xoap/domutils.h"
23 #include <netinet/in.h>
36 BU::BU(xdaq::ApplicationStub *
s)
37 : xdaq::Application(s)
38 , log_(getApplicationLogger())
39 , buAppDesc_(getApplicationDescriptor())
41 , buAppContext_(getApplicationContext())
59 , deltaSumOfSquares_(0)
66 , nbEventsRequested_(0)
69 , nbEventsDiscarded_(0)
73 , overwriteEvtId_(
true)
74 , overwriteLsId_(
false)
75 , fakeLsUpdateSecs_(23)
78 , eventBufferSize_(0x400000)
79 , msgBufferSize_(32768)
83 , useFixedFedSize_(
false)
89 , monLastSumOfSquares_(0)
90 , monLastSumOfSizes_(0)
100 getApplicationDescriptor()->getContextDescriptor()->getURL()+
"/"+
101 getApplicationDescriptor()->getURN();
102 class_ =getApplicationDescriptor()->getClassName();
103 instance_=getApplicationDescriptor()->getInstance();
104 hostname_=getApplicationDescriptor()->getContextDescriptor()->getURL();
114 toolbox::mem::HeapAllocator *allocator=
new toolbox::mem::HeapAllocator();
115 toolbox::net::URN urn(
"toolbox-mem-pool",i2oPoolName);
116 toolbox::mem::MemoryPoolFactory* poolFactory=
117 toolbox::mem::getMemoryPoolFactory();
118 i2oPool_=poolFactory->createPool(urn,allocator);
121 string s=
"Failed to create pool: "+i2oPoolName;
122 LOG4CPLUS_FATAL(
log_,s);
132 vector<toolbox::lang::Method*> methods=
gui_->getMethods();
133 vector<toolbox::lang::Method*>::iterator it;
134 for (it=methods.begin();it!=methods.end();++it) {
135 if ((*it)->type()==
"cgi") {
136 string name=
static_cast<xgi::MethodSignature*
>(*it)->name();
156 fedSizeMean_.value_/fedSizeMean_.value_))));
182 LOG4CPLUS_INFO(
log_,
"Start configuring ...");
184 LOG4CPLUS_INFO(
log_,
"Finished configuring!");
188 string msg =
"configuring FAILED: " + (string)e.what();
201 LOG4CPLUS_INFO(
log_,
"Start enabling ...");
214 LOG4CPLUS_INFO(
log_,
"Finished enabling!");
218 string msg =
"enabling FAILED: " + (string)e.what();
230 LOG4CPLUS_INFO(
log_,
"Start stopping :) ...");
239 LOG4CPLUS_INFO(
log_,
"wait to flush ... #builtIds="<<
builtIds_.size());
254 LOG4CPLUS_INFO(
log_,
"wait to flush ...");
268 LOG4CPLUS_INFO(
log_,
"Finished stopping!");
272 string msg =
"stopping FAILED: " + (string)e.what();
283 LOG4CPLUS_INFO(
log_,
"Start halting ...");
299 LOG4CPLUS_INFO(
log_,
"Finished halting!");
303 string msg =
"halting FAILED: " + (string)e.what();
314 return fsm_.commandCallback(
msg);
322 LOG4CPLUS_WARN(
log_,
"Ignore BU_ALLOCATE message while halting.");
327 I2O_MESSAGE_FRAME *stdMsg;
328 I2O_BU_ALLOCATE_MESSAGE_FRAME *
msg;
330 stdMsg=(I2O_MESSAGE_FRAME*)bufRef->getDataLocation();
331 msg =(I2O_BU_ALLOCATE_MESSAGE_FRAME*)stdMsg;
334 I2O_TID fuTid=stdMsg->InitiatorAddress;
335 fuAppDesc_=i2o::utils::getAddressMap()->getApplicationDescriptor(fuTid);
338 for (
unsigned int i=0;
i<msg->n;
i++) {
339 unsigned int fuResourceId=msg->allocate[
i].fuTransactionId;
356 LOG4CPLUS_WARN(
log_,
"Ignore BU_DISCARD message while halting.");
361 I2O_MESSAGE_FRAME *stdMsg=(I2O_MESSAGE_FRAME*)bufRef->getDataLocation();
362 I2O_BU_DISCARD_MESSAGE_FRAME*
msg =(I2O_BU_DISCARD_MESSAGE_FRAME*)stdMsg;
363 unsigned int buResourceId=msg->buResourceId[0];
370 LOG4CPLUS_ERROR(
log_,
"can't discard unknown buResourceId '"<<buResourceId<<
"'");
388 if (e.type()==
"urn:xdata-event:ItemGroupRetrieveEvent") {
393 else if (e.type()==
"ItemChangedEvent") {
394 string item=
dynamic_cast<xdata::ItemChangedEvent&
>(
e).itemName();
405 string name=
in->getenv(
"PATH_INFO");
406 if (name.empty()) name=
"defaultWebPage";
407 static_cast<xgi::MethodSignature*
>(gui_->getMethod(name))->invoke(
in,
out);
415 *
out<<
"<html></html>"<<endl;
423 LOG4CPLUS_INFO(
log_,
"Start 'building' workloop");
425 toolbox::task::getWorkLoopFactory()->getWorkLoop(
sourceId_+
434 string msg =
"Failed to start workloop 'building'.";
448 if (buResourceId>=(uint32_t)
events_.size()) {
449 LOG4CPLUS_INFO(
log_,
"shutdown 'building' workloop.");
465 LOG4CPLUS_INFO(
log_,
"building:received null post");
467 unsigned int saveBUResourceId = buResourceId;
483 LOG4CPLUS_INFO(
log_,
"Start 'sending' workloop");
494 string msg =
"Failed to start workloop 'sending'.";
508 if (buResourceId>=(uint32_t)
events_.size()) {
509 LOG4CPLUS_INFO(
log_,
"shutdown 'sending' workloop.");
541 struct timezone timezone;
545 LOG4CPLUS_INFO(
log_,
"Start 'monitoring' workloop");
547 toolbox::task::getWorkLoopFactory()->getWorkLoop(
sourceId_+
555 string msg =
"Failed to start workloop 'monitoring'.";
564 struct timeval monEndTime;
565 struct timezone timezone;
567 gettimeofday(&monEndTime,&timezone);
586 monLastSumOfSquares_=monSumOfSquares;
589 monLastSumOfSizes_=monSumOfSizes;
600 double meanOfSquares,
mean,squareOfMean,variance;
605 squareOfMean=mean*
mean;
606 variance=meanOfSquares-squareOfMean;
632 LOG4CPLUS_ERROR(
log_,
"No GUI, can't export parameters");
713 sem_init(&
lock_,0,1);
732 sec = end->tv_sec - start->tv_sec;
734 if(end->tv_usec > start->tv_usec) {
735 usec = end->tv_usec - start->tv_usec;
739 usec = 1000000 - ((
unsigned int )(start->tv_usec - end->tv_usec));
742 return ((
double)sec) + ((
double)usec) / 1000000.0;
763 if(
event == 0)
return false;
768 unsigned int fedSize=
event->FEDData(fedId).size();
769 unsigned char* fedAddr=
event->FEDData(fedId).data();
772 fedHeader->
eventid=(fedHeader->
eventid&0xFF000000)+(evtNumber&0x00FFFFFF);
774 if (fedSize>0) evt->
writeFed(fedId,fedAddr,fedSize);
788 fedSize=(
unsigned int)(
std::exp(logFedSize));
789 if (fedSize<fedSizeMin) fedSize=fedSizeMin;
806 unsigned int fuResourceId)
808 unsigned int msgHeaderSize =
sizeof(I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME);
811 if((msgPayloadSize%4)!=0) LOG4CPLUS_ERROR(
log_,
"Invalid Payload Size.");
823 gettimeofday(&newLsUpdate,&tz);
824 if ((
unsigned long)1000000*newLsUpdate.tv_sec+newLsUpdate.tv_usec
838 unsigned char * fgtpAddr = evt->
fedAddr(
k);
839 unsigned int fgtpSize = evt->
fedSize(
k);
840 if (fgtpAddr && fgtpSize) {
843 *((
unsigned short*)fgtpAddr
844 +
sizeof(
fedh_t)/
sizeof(
unsigned short)
851 unsigned char * fegtpAddr = evt->
fedAddr(egtpFedPos_);
852 unsigned int fegtpSize = evt->
fedSize(egtpFedPos_);
853 if (fegtpAddr && fegtpSize) {
856 ) = (
unsigned int)(
fakeLs_-1)*0x00100000;
860 if (gtpFedPos_<0) LOG4CPLUS_ERROR(
log_,
"Unable to find GTP FED in event!");
861 if (egtpFedPos_<0 && gtpFedPos_<0) LOG4CPLUS_ERROR(
log_,
"Unable to find GTP or GTPE FED in event!");
864 toolbox::mem::Reference *head =0;
865 toolbox::mem::Reference *tail =0;
866 toolbox::mem::Reference *bufRef=0;
868 I2O_MESSAGE_FRAME *stdMsg=0;
869 I2O_PRIVATE_MESSAGE_FRAME *pvtMsg=0;
870 I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME *
block =0;
872 unsigned int iFed =0;
873 unsigned int nSuperFrag =64;
874 unsigned int nFedPerSuperFrag=
validFedIds_.size()/nSuperFrag;
875 unsigned int nBigSuperFrags =
validFedIds_.size()%nSuperFrag;
877 if (evt->
nFed()<nSuperFrag) {
878 nSuperFrag=evt->
nFed();
884 nFedPerSuperFrag=evt->
nFed()/nSuperFrag;
885 nBigSuperFrags =evt->
nFed()%nSuperFrag;
888 for (
unsigned int iSuperFrag=0;iSuperFrag<nSuperFrag;iSuperFrag++) {
891 unsigned int nFed=iFed+nFedPerSuperFrag;
892 if (iSuperFrag<nBigSuperFrags) ++nFed;
895 unsigned int nBlock =0;
897 unsigned int totSize =curbSize;
898 for (
unsigned int i=iFed;
i<nFed;
i++) {
901 if (curbSize>msgPayloadSize) {
903 if(curbSize%msgPayloadSize)totSize+=
frlHeaderSize_*(curbSize/msgPayloadSize);
905 curbSize=curbSize%msgPayloadSize;
908 nBlock=totSize/msgPayloadSize+(totSize%msgPayloadSize>0 ? 1 : 0);
912 unsigned int remainder =0;
913 bool fedTrailerLeft=
false;
916 unsigned char *startOfPayload=0;
919 for(
unsigned int iBlock=0;iBlock<nBlock;iBlock++) {
922 payload=msgPayloadSize;
926 bufRef=toolbox::mem::getMemoryPoolFactory()->getFrame(
i2oPool_,
930 LOG4CPLUS_FATAL(
log_,
"xdaq::frameAlloc failed");
934 stdMsg=(I2O_MESSAGE_FRAME*)bufRef->getDataLocation();
935 pvtMsg=(I2O_PRIVATE_MESSAGE_FRAME*)stdMsg;
936 block =(I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME*)stdMsg;
938 pvtMsg->XFunctionCode =I2O_FU_TAKE;
939 pvtMsg->OrganizationID =XDAQ_ORGANIZATION_ID;
941 stdMsg->MessageSize =(msgHeaderSize + payload) >> 2;
942 stdMsg->Function =I2O_PRIVATE_MESSAGE;
943 stdMsg->VersionOffset =0;
945 stdMsg->InitiatorAddress=i2o::utils::getAddressMap()->getTid(
buAppDesc_);
946 stdMsg->TargetAddress =i2o::utils::getAddressMap()->getTid(
fuAppDesc_);
949 block->fuTransactionId =fuResourceId;
950 block->blockNb =iBlock;
951 block->nbBlocksInSuperFragment=nBlock;
952 block->superFragmentNb =iSuperFrag;
953 block->nbSuperFragmentsInEvent=nSuperFrag;
957 startOfPayload =(
unsigned char*)block+msgHeaderSize;
958 frlh_t* frlHeader=(frlh_t*)startOfPayload;
960 frlHeader->segno =iBlock;
964 frlHeader->segsize =payload;
965 unsigned int leftspace=payload;
969 memcpy(startOfFedBlocks,
976 fedTrailerLeft =
false;
979 if((iFed==nFed-1) && !
last) {
980 frlHeader->segsize-=leftspace;
981 int msgSize=stdMsg->MessageSize << 2;
983 bufRef->setDataSize(msgSize);
984 stdMsg->MessageSize = msgSize >> 2;
985 frlHeader->segsize=frlHeader->segsize | FRL_LAST_SEGM;
999 if(payload>=remainder) {
1000 memcpy(startOfFedBlocks,
1004 startOfFedBlocks+=remainder;
1005 leftspace -=remainder;
1009 frlHeader->segsize-=leftspace;
1010 int msgSize=stdMsg->MessageSize << 2;
1011 msgSize -=leftspace;
1012 bufRef->setDataSize(msgSize);
1013 stdMsg->MessageSize = msgSize >> 2;
1014 frlHeader->segsize=frlHeader->segsize | FRL_LAST_SEGM;
1026 memcpy(startOfFedBlocks,
1031 fedTrailerLeft =
true;
1037 memcpy(startOfFedBlocks,
1054 frlHeader->segsize-=leftspace;
1065 memcpy(startOfFedBlocks,
1074 memcpy(startOfFedBlocks,
1079 frlHeader->segsize-=leftspace;
1080 fedTrailerLeft =
true;
1100 if (iFed==nFed && remainder==0 && !last) {
1101 frlHeader->segsize-=leftspace;
1102 int msgSize=stdMsg->MessageSize << 2;
1103 msgSize -=leftspace;
1104 bufRef->setDataSize(msgSize);
1105 stdMsg->MessageSize=msgSize >> 2;
1106 frlHeader->segsize =frlHeader->segsize | FRL_LAST_SEGM;
1112 if(iSuperFrag==0&&iBlock==0) {
1117 tail->setNextReference(bufRef);
1121 if((iBlock==nBlock-1) && remainder!=0) {
1130 toolbox::mem::Reference*
next=head;
1132 block =(I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME*)next->getDataLocation();
1133 if (block->superFragmentNb==iSuperFrag)
1134 block->nbBlocksInSuperFragment=nBlock;
1135 }
while((next=next->getNextReference()));
1151 printf(
"Byte 0 1 2 3 4 5 6 7\n");
1156 for (
unsigned int i=0;
i<(len/8);
i++) {
1159 for (pos=0;pos<12;pos+=3) {
1160 sprintf(&left1[pos],
"%2.2x ",
1161 ((
unsigned char*)data)[c+off]);
1162 sprintf(&right1[rpos],
"%1c",
1163 ((data[c+off] > 32)&&(data[c+off] < 127)) ? data[c+off] :
'.');
1164 sprintf (&left2[pos],
"%2.2x ",
1165 ((
unsigned char*)data)[c+off+4]);
1166 sprintf (&right2[rpos],
"%1c",
1167 ((data[c+off+4] > 32)&&(data[c+off+4]<127)) ? data[c+off+4] :
'.');
1173 printf (
"%4d: %s%s || %s%s %p\n",
1174 c-8, left1, left2, right1, right2, &data[c-8]);
1185 XDAQ_INSTANTIATOR_IMPL(
BU)
static const char runNumber_[]
void setLargeAppIcon(CString_t &icon)
xdata::UnsignedInteger32 nbEventsInBU_
const unsigned int EVM_TCS_LSBLNR_OFFSET
void initialize(unsigned int evtNumber)
bool configuring(toolbox::task::WorkLoop *wl)
void webPageRequest(xgi::Input *in, xgi::Output *out)
xdata::Boolean overwriteEvtId_
toolbox::task::ActionSignature * asMonitoring_
xdata::Double memUsedInMB_
unsigned int evtNumber() const
static void setComputeCrc(bool computeCrc)
std::queue< unsigned int > builtIds_
void addStandardParam(CString_t &name, Param_t *param)
bool sending(toolbox::task::WorkLoop *wl)
bool writeFed(unsigned int id, unsigned char *data, unsigned int size)
xdata::UnsignedInteger32 msgBufferSize_
xdaq::ApplicationContext * buAppContext_
toolbox::mem::Reference * createMsgChain(evf::BUEvent *evt, unsigned int fuResourceId)
unsigned int buResourceId() const
virtual FEDRawDataCollection * getFEDRawData()
xdata::Boolean useFixedFedSize_
double deltaT(const struct timeval *start, const struct timeval *end)
xdata::UnsignedInteger32 fedSizeWidth_
static const int frlHeaderSize_
std::queue< unsigned int > rqstIds_
void setSmallAppIcon(CString_t &icon)
void addMonitorParam(CString_t &name, Param_t *param)
void addItemChangedListener(CString_t &name, xdata::ActionListener *l)
bool monitoring(toolbox::task::WorkLoop *wl)
xdata::UnsignedInteger32 instance_
xdata::Boolean overwriteLsId_
bool writeFedHeader(unsigned int i)
xdata::Double throughput_
xdata::UnsignedInteger32 fakeLsUpdateSecs_
const unsigned int SLINK_HALFWORD_SIZE
void startMonitoringWorkLoop()
static const int fedTrailerSize_
void customWebPage(xgi::Input *in, xgi::Output *out)
bool stopping(toolbox::task::WorkLoop *wl)
void actionPerformed(xdata::Event &e)
unsigned char * fedAddr(unsigned int i) const
xdata::Double deltaSumOfSquares_
void startBuildingWorkLoop()
xdata::Bag< xdaq2rc::ClassnameAndInstance > * rcmsStateListener()
void fireFailed(const std::string &errorMsg, void *originator)
xoap::MessageReference fsmCallback(xoap::MessageReference msg)
static PlaybackRawDataProvider * instance()
toolbox::task::ActionSignature * asBuilding_
toolbox::task::WorkLoop * wlSending_
xdata::UnsignedInteger32 nbEventsSent_
void I2O_BU_ALLOCATE_Callback(toolbox::mem::Reference *bufRef)
xdata::UnsignedInteger32 nbEventsRequested_
unsigned int evtSize() const
toolbox::task::WorkLoop * wlBuilding_
xdaq::ApplicationDescriptor * buAppDesc_
unsigned int fedId(unsigned int i) const
xdata::UnsignedInteger32 monSleepSec_
void dumpFrame(unsigned char *data, unsigned int len)
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
toolbox::task::ActionSignature * asSending_
bool writeFedTrailer(unsigned int i)
bool evm_board_sense(const unsigned char *p, size_t size)
xdata::UnsignedInteger32 deltaN_
void fireEvent(const std::string &evtType, void *originator)
std::vector< unsigned int > validFedIds_
toolbox::task::WorkLoop * wlMonitoring_
unsigned long long uint64_t
xdata::UnsignedInteger32 fedSizeMax_
xdata::String * stateName()
bool generateEvent(evf::BUEvent *evt)
bool halting(toolbox::task::WorkLoop *wl)
xdata::InfoSpace * monInfoSpace()
xdata::Boolean * foundRcmsStateListener()
toolbox::mem::Pool * i2oPool_
xdata::UnsignedInteger32 deltaSumOfSizes_
bool building(toolbox::task::WorkLoop *wl)
unsigned int fedSize(unsigned int i) const
std::vector< evf::BUEvent * > events_
xdata::UnsignedInteger32 runNumber_
xdata::UnsignedInteger32 eventBufferSize_
xdata::UnsignedInteger32 fedSizeMean_
void I2O_BU_DISCARD_Callback(toolbox::mem::Reference *bufRef)
std::queue< unsigned int > freeIds_
char data[epos_bytes_allocation]
xdaq::ApplicationDescriptor * fuAppDesc_
std::set< unsigned int > sentIds_
void startSendingWorkLoop()
unsigned int monLastSumOfSizes_
uint64_t monLastSumOfSquares_
xdata::UnsignedInteger32 nbEventsDiscarded_
void addMonitorCounter(CString_t &name, Counter_t *counter)
static const int fedHeaderSize_
xdata::UnsignedInteger32 queueSize_
unsigned int nFed() const
bool enabling(toolbox::task::WorkLoop *wl)
xdata::UnsignedInteger32 nbEventsBuilt_
struct timeval monStartTime_
static bool inRangeNoGT(int)
const unsigned int GTPE_ORBTNR_OFFSET
void findRcmsStateListener()
xdata::UnsignedInteger32 firstEvent_