#include <BU.h>
Public Member Functions | |
void | actionPerformed (xdata::Event &e) |
BU (xdaq::ApplicationStub *s) | |
bool | building (toolbox::task::WorkLoop *wl) |
bool | configuring (toolbox::task::WorkLoop *wl) |
void | customWebPage (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception) |
bool | enabling (toolbox::task::WorkLoop *wl) |
xoap::MessageReference | fsmCallback (xoap::MessageReference msg) throw (xoap::exception::Exception) |
bool | halting (toolbox::task::WorkLoop *wl) |
void | I2O_BU_ALLOCATE_Callback (toolbox::mem::Reference *bufRef) |
void | I2O_BU_DISCARD_Callback (toolbox::mem::Reference *bufRef) |
bool | monitoring (toolbox::task::WorkLoop *wl) |
bool | sending (toolbox::task::WorkLoop *wl) |
void | startBuildingWorkLoop () throw (evf::Exception) |
void | startMonitoringWorkLoop () throw (evf::Exception) |
void | startSendingWorkLoop () throw (evf::Exception) |
bool | stopping (toolbox::task::WorkLoop *wl) |
void | webPageRequest (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception) |
XDAQ_INSTANTIATOR () | |
virtual | ~BU () |
Private Member Functions | |
toolbox::mem::Reference * | createMsgChain (evf::BUEvent *evt, unsigned int fuResourceId) |
double | deltaT (const struct timeval *start, const struct timeval *end) |
void | dumpFrame (unsigned char *data, unsigned int len) |
void | exportParameters () |
bool | generateEvent (evf::BUEvent *evt) |
void | lock () |
void | postBuild () |
void | postRqst () |
void | postSend () |
void | reset () |
void | unlock () |
void | waitBuild () |
void | waitRqst () |
void | waitSend () |
Private Attributes | |
toolbox::task::ActionSignature * | asBuilding_ |
toolbox::task::ActionSignature * | asMonitoring_ |
toolbox::task::ActionSignature * | asSending_ |
xdata::Double | average_ |
xdaq::ApplicationContext * | buAppContext_ |
xdaq::ApplicationDescriptor * | buAppDesc_ |
sem_t | buildSem_ |
std::queue< unsigned int > | builtIds_ |
xdata::String | class_ |
xdata::Boolean | crc_ |
xdata::UnsignedInteger32 | deltaN_ |
xdata::UnsignedInteger32 | deltaSumOfSizes_ |
xdata::Double | deltaSumOfSquares_ |
xdata::Double | deltaT_ |
xdata::UnsignedInteger32 | eventBufferSize_ |
std::vector< evf::BUEvent * > | events_ |
unsigned int | evtNumber_ |
unsigned int | fakeLs_ |
xdata::UnsignedInteger32 | fakeLsUpdateSecs_ |
xdata::UnsignedInteger32 | fedSizeMax_ |
xdata::UnsignedInteger32 | fedSizeMean_ |
xdata::UnsignedInteger32 | fedSizeWidth_ |
xdata::UnsignedInteger32 | firstEvent_ |
std::queue< unsigned int > | freeIds_ |
StateMachine | fsm_ |
xdaq::ApplicationDescriptor * | fuAppDesc_ |
double | gaussianMean_ |
double | gaussianWidth_ |
WebGUI * | gui_ |
xdata::String | hostname_ |
toolbox::mem::Pool * | i2oPool_ |
xdata::UnsignedInteger32 | instance_ |
bool | isBuilding_ |
bool | isHalting_ |
bool | isSending_ |
timeval | lastLsUpdate_ |
sem_t | lock_ |
Logger | log_ |
xdata::Double | memUsedInMB_ |
xdata::String | mode_ |
unsigned int | monLastN_ |
unsigned int | monLastSumOfSizes_ |
uint64_t | monLastSumOfSquares_ |
xdata::UnsignedInteger32 | monSleepSec_ |
struct timeval | monStartTime_ |
xdata::UnsignedInteger32 | msgBufferSize_ |
xdata::UnsignedInteger32 | nbEventsBuilt_ |
xdata::UnsignedInteger32 | nbEventsDiscarded_ |
xdata::UnsignedInteger32 | nbEventsInBU_ |
xdata::UnsignedInteger32 | nbEventsRequested_ |
xdata::UnsignedInteger32 | nbEventsSent_ |
xdata::Boolean | overwriteEvtId_ |
xdata::Boolean | overwriteLsId_ |
xdata::UnsignedInteger32 | queueSize_ |
xdata::Double | rate_ |
xdata::Boolean | replay_ |
xdata::Double | rms_ |
std::queue< unsigned int > | rqstIds_ |
sem_t | rqstSem_ |
xdata::UnsignedInteger32 | runNumber_ |
sem_t | sendSem_ |
std::set< unsigned int > | sentIds_ |
std::string | sourceId_ |
unsigned int | sumOfSizes_ |
uint64_t | sumOfSquares_ |
xdata::Double | throughput_ |
xdata::String | url_ |
xdata::Boolean | useFixedFedSize_ |
std::vector< unsigned int > | validFedIds_ |
toolbox::task::WorkLoop * | wlBuilding_ |
toolbox::task::WorkLoop * | wlMonitoring_ |
toolbox::task::WorkLoop * | wlSending_ |
Static Private Attributes | |
static const int | fedHeaderSize_ = sizeof(fedh_t) |
static const int | fedTrailerSize_ = sizeof(fedt_t) |
static const int | frlHeaderSize_ = sizeof(frlh_t) |
BU::BU | ( | xdaq::ApplicationStub * | s | ) |
Definition at line 36 of file BU.cc.
References class_, crc_, customWebPage(), alignCSCRings::e, Exception, exportParameters(), fedSizeMean_, fedSizeWidth_, evf::StateMachine::findRcmsStateListener(), fsm_, gaussianMean_, gaussianWidth_, gui_, hostname_, I2O_BU_ALLOCATE_Callback(), I2O_BU_DISCARD_Callback(), i2oPool_, evf::StateMachine::initialize(), instance_, create_public_lumi_plots::log, log_, mergeVDriftHistosByStation::name, evf::BUEvent::setComputeCrc(), evf::WebGUI::setLargeAppIcon(), evf::WebGUI::setSmallAppIcon(), sourceId_, mathSSE::sqrt(), startMonitoringWorkLoop(), url_, and webPageRequest().
: xdaq::Application(s) , log_(getApplicationLogger()) , buAppDesc_(getApplicationDescriptor()) , fuAppDesc_(0) , buAppContext_(getApplicationContext()) , fsm_(this) , gui_(0) , evtNumber_(0) , isBuilding_(false) , isSending_(false) , isHalting_(false) , wlBuilding_(0) , asBuilding_(0) , wlSending_(0) , asSending_(0) , wlMonitoring_(0) , asMonitoring_(0) , instance_(0) , runNumber_(0) , memUsedInMB_(0.0) , deltaT_(0.0) , deltaN_(0) , deltaSumOfSquares_(0) , deltaSumOfSizes_(0) , throughput_(0.0) , average_(0.0) , rate_(0.0) , rms_(0.0) , nbEventsInBU_(0) , nbEventsRequested_(0) , nbEventsBuilt_(0) , nbEventsSent_(0) , nbEventsDiscarded_(0) , mode_("RANDOM") , replay_(false) , crc_(true) , overwriteEvtId_(true) , overwriteLsId_(false) , fakeLsUpdateSecs_(23) , firstEvent_(1) , queueSize_(32) , eventBufferSize_(0x400000) , msgBufferSize_(32768) , fedSizeMax_(65536) , fedSizeMean_(1024) , fedSizeWidth_(1024) , useFixedFedSize_(false) , monSleepSec_(1) , fakeLs_(0) , gaussianMean_(0.0) , gaussianWidth_(1.0) , monLastN_(0) , monLastSumOfSquares_(0) , monLastSumOfSizes_(0) , sumOfSquares_(0) , sumOfSizes_(0) , i2oPool_(0) { // initialize state machine fsm_.initialize<evf::BU>(this); // initialize application info url_ = getApplicationDescriptor()->getContextDescriptor()->getURL()+"/"+ getApplicationDescriptor()->getURN(); class_ =getApplicationDescriptor()->getClassName(); instance_=getApplicationDescriptor()->getInstance(); hostname_=getApplicationDescriptor()->getContextDescriptor()->getURL(); sourceId_=class_.toString()+instance_.toString(); // i2o callbacks i2o::bind(this,&BU::I2O_BU_ALLOCATE_Callback,I2O_BU_ALLOCATE,XDAQ_ORGANIZATION_ID); i2o::bind(this,&BU::I2O_BU_DISCARD_Callback, I2O_BU_DISCARD, XDAQ_ORGANIZATION_ID); // allocate i2o memery pool string i2oPoolName=sourceId_+"_i2oPool"; try { toolbox::mem::HeapAllocator *allocator=new toolbox::mem::HeapAllocator(); toolbox::net::URN urn("toolbox-mem-pool",i2oPoolName); toolbox::mem::MemoryPoolFactory* poolFactory= toolbox::mem::getMemoryPoolFactory(); i2oPool_=poolFactory->createPool(urn,allocator); } catch (toolbox::mem::exception::Exception& e) { string s="Failed to create pool: "+i2oPoolName; LOG4CPLUS_FATAL(log_,s); XCEPT_RETHROW(xcept::Exception,s,e); } // web interface xgi::bind(this,&evf::BU::webPageRequest,"Default"); gui_=new WebGUI(this,&fsm_); gui_->setSmallAppIcon("/rubuilder/bu/images/bu32x32.gif"); gui_->setLargeAppIcon("/rubuilder/bu/images/bu64x64.gif"); vector<toolbox::lang::Method*> methods=gui_->getMethods(); vector<toolbox::lang::Method*>::iterator it; for (it=methods.begin();it!=methods.end();++it) { if ((*it)->type()=="cgi") { string name=static_cast<xgi::MethodSignature*>(*it)->name(); xgi::bind(this,&evf::BU::webPageRequest,name); } } xgi::bind(this,&evf::BU::customWebPage,"customWebPage"); // export parameters to info space(s) exportParameters(); // findRcmsStateListener fsm_.findRcmsStateListener(); // compute parameters for fed size generation (a la Emilio) gaussianMean_ =std::log((double)fedSizeMean_); gaussianWidth_=std::sqrt(std::log (0.5* (1+std::sqrt (1.0+4.0* fedSizeWidth_.value_*fedSizeWidth_.value_/ fedSizeMean_.value_/fedSizeMean_.value_)))); // start monitoring thread, once and for all startMonitoringWorkLoop(); // propagate crc flag to BUEvent BUEvent::setComputeCrc(crc_.value_); }
BU::~BU | ( | ) | [virtual] |
void BU::actionPerformed | ( | xdata::Event & | e | ) |
Definition at line 385 of file BU.cc.
References crc_, alignCSCRings::e, gui_, i2oPool_, instance, memUsedInMB_, mode_, evf::WebGUI::monInfoSpace(), and evf::BUEvent::setComputeCrc().
{ gui_->monInfoSpace()->lock(); if (e.type()=="urn:xdata-event:ItemGroupRetrieveEvent") { mode_=(0==PlaybackRawDataProvider::instance())?"RANDOM":"PLAYBACK"; if (0!=i2oPool_) memUsedInMB_=i2oPool_->getMemoryUsage().getUsed()*9.53674e-07; else memUsedInMB_=0.0; } else if (e.type()=="ItemChangedEvent") { string item=dynamic_cast<xdata::ItemChangedEvent&>(e).itemName(); if (item=="crc") BUEvent::setComputeCrc(crc_.value_); } gui_->monInfoSpace()->unlock(); }
bool BU::building | ( | toolbox::task::WorkLoop * | wl | ) |
Definition at line 441 of file BU.cc.
References builtIds_, events_, freeIds_, generateEvent(), isBuilding_, isHalting_, lock(), log_, nbEventsBuilt_, postSend(), unlock(), and waitBuild().
Referenced by startBuildingWorkLoop().
{ waitBuild(); lock(); unsigned int buResourceId=freeIds_.front(); freeIds_.pop(); unlock(); if (buResourceId>=(uint32_t)events_.size()) { LOG4CPLUS_INFO(log_,"shutdown 'building' workloop."); isBuilding_=false; return false; } if (!isHalting_) { BUEvent* evt=events_[buResourceId]; if(generateEvent(evt)) { lock(); nbEventsBuilt_++; builtIds_.push(buResourceId); unlock(); postSend(); } else { LOG4CPLUS_INFO(log_,"building:received null post"); lock(); unsigned int saveBUResourceId = buResourceId; //buResourceId = freeIds_.front(); freeIds_.pop(); freeIds_.push(saveBUResourceId); unlock(); isBuilding_=false; return false; } } return true; }
bool BU::configuring | ( | toolbox::task::WorkLoop * | wl | ) |
Definition at line 178 of file BU.cc.
References alignCSCRings::e, Exception, evf::StateMachine::fireEvent(), evf::StateMachine::fireFailed(), fsm_, isHalting_, log_, lumiQueryAPI::msg, and reset().
{ isHalting_=false; try { LOG4CPLUS_INFO(log_,"Start configuring ..."); reset(); LOG4CPLUS_INFO(log_,"Finished configuring!"); fsm_.fireEvent("ConfigureDone",this); } catch (xcept::Exception &e) { string msg = "configuring FAILED: " + (string)e.what(); fsm_.fireFailed(msg,this); } return false; }
toolbox::mem::Reference * BU::createMsgChain | ( | evf::BUEvent * | evt, |
unsigned int | fuResourceId | ||
) | [private] |
remainder>0 means that a partial fed is left over from the last block
no remaining fed data
Definition at line 805 of file BU.cc.
References Association::block, buAppDesc_, evf::BUEvent::buResourceId(), alignCSCRings::e, evf::evtn::evm_board_sense(), evf::evtn::EVM_TCS_LSBLNR_OFFSET, evf::BUEvent::evtNumber(), Exception, fakeLs_, fakeLsUpdateSecs_, evf::BUEvent::fedAddr(), fedHeaderSize_, evf::BUEvent::fedId(), evf::BUEvent::fedSize(), fedTrailerSize_, frlHeaderSize_, fuAppDesc_, evf::evtn::GTPE_ORBTNR_OFFSET, i, i2oPool_, gen::k, prof2calltree::last, lastLsUpdate_, log_, FEDNumbering::MINTriggerEGTPFEDID, FEDNumbering::MINTriggerGTPFEDID, msgBufferSize_, GetRecoTauVFromDQM_MC_cff::next, evf::BUEvent::nFed(), overwriteLsId_, evf::evtn::SLINK_HALFWORD_SIZE, validFedIds_, and warning.
Referenced by sending().
{ unsigned int msgHeaderSize =sizeof(I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME); unsigned int msgPayloadSize=msgBufferSize_-msgHeaderSize; if((msgPayloadSize%4)!=0) LOG4CPLUS_ERROR(log_,"Invalid Payload Size."); /*Overwrite lumisection value stored in the event*/ if (overwriteLsId_.value_) { //getting new time and increase LS if past 23 sec struct timezone tz; if (!fakeLs_) { fakeLs_++; gettimeofday(&lastLsUpdate_,&tz); } else { timeval newLsUpdate; gettimeofday(&newLsUpdate,&tz); if ((unsigned long)1000000*newLsUpdate.tv_sec+newLsUpdate.tv_usec - (unsigned long)1000000*lastLsUpdate_.tv_sec+lastLsUpdate_.tv_usec >= fakeLsUpdateSecs_.value_*1000000) { fakeLs_++; lastLsUpdate_=newLsUpdate; } } int gtpFedPos_=-1; int egtpFedPos_=-1; for (size_t k=0;k<validFedIds_.size();k++) { if (evt->fedId(k)==FEDNumbering::MINTriggerGTPFEDID) { //insert ls value into gtp fed unsigned char * fgtpAddr = evt->fedAddr(k); unsigned int fgtpSize = evt->fedSize(k); if (fgtpAddr && fgtpSize) { gtpFedPos_=(int)k; evtn::evm_board_sense(fgtpAddr,fgtpSize); *((unsigned short*)fgtpAddr +sizeof(fedh_t)/sizeof(unsigned short) + (evtn::EVM_GTFE_BLOCK*2 + evtn::EVM_TCS_LSBLNR_OFFSET)*evtn::SLINK_HALFWORD_SIZE /sizeof(unsigned short) ) = (unsigned short)fakeLs_-1; } } if (evt->fedId(k)==FEDNumbering::MINTriggerEGTPFEDID) { //insert orbit value into gtpe fed unsigned char * fegtpAddr = evt->fedAddr(egtpFedPos_); unsigned int fegtpSize = evt->fedSize(egtpFedPos_); if (fegtpAddr && fegtpSize) { egtpFedPos_=(int)k; *( (unsigned int*)fegtpAddr + evtn::GTPE_ORBTNR_OFFSET * evtn::SLINK_HALFWORD_SIZE/sizeof(unsigned int) ) = (unsigned int)(fakeLs_-1)*0x00100000; } } } if (gtpFedPos_<0) LOG4CPLUS_ERROR(log_,"Unable to find GTP FED in event!"); if (egtpFedPos_<0 && gtpFedPos_<0) LOG4CPLUS_ERROR(log_,"Unable to find GTP or GTPE FED in event!"); } toolbox::mem::Reference *head =0; toolbox::mem::Reference *tail =0; toolbox::mem::Reference *bufRef=0; I2O_MESSAGE_FRAME *stdMsg=0; I2O_PRIVATE_MESSAGE_FRAME *pvtMsg=0; I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME *block =0; unsigned int iFed =0; unsigned int nSuperFrag =64; unsigned int nFedPerSuperFrag=validFedIds_.size()/nSuperFrag; unsigned int nBigSuperFrags =validFedIds_.size()%nSuperFrag; if (evt->nFed()<nSuperFrag) { nSuperFrag=evt->nFed(); nFedPerSuperFrag=1; nBigSuperFrags=0; } else { nFedPerSuperFrag=evt->nFed()/nSuperFrag; nBigSuperFrags =evt->nFed()%nSuperFrag; } // loop over all super fragments for (unsigned int iSuperFrag=0;iSuperFrag<nSuperFrag;iSuperFrag++) { // compute index of last fed in this super fragment unsigned int nFed=iFed+nFedPerSuperFrag; if (iSuperFrag<nBigSuperFrags) ++nFed; // compute number of blocks in this super fragment unsigned int nBlock =0; unsigned int curbSize=frlHeaderSize_; unsigned int totSize =curbSize; for (unsigned int i=iFed;i<nFed;i++) { curbSize+=evt->fedSize(i); totSize+=evt->fedSize(i); if (curbSize>msgPayloadSize) { curbSize+=frlHeaderSize_*(curbSize/msgPayloadSize); if(curbSize%msgPayloadSize)totSize+=frlHeaderSize_*(curbSize/msgPayloadSize); else totSize+=frlHeaderSize_*((curbSize/msgPayloadSize)-1); curbSize=curbSize%msgPayloadSize; } } nBlock=totSize/msgPayloadSize+(totSize%msgPayloadSize>0 ? 1 : 0); // loop over all blocks (msgs) in the current super fragment unsigned int remainder =0; bool fedTrailerLeft=false; bool last =false; bool warning =false; unsigned char *startOfPayload=0; U32 payload(0); for(unsigned int iBlock=0;iBlock<nBlock;iBlock++) { // If last block and its partial (there can be only 0 or 1 partial) payload=msgPayloadSize; // Allocate memory for a fragment block / message try { bufRef=toolbox::mem::getMemoryPoolFactory()->getFrame(i2oPool_, msgBufferSize_); } catch(xcept::Exception &e) { LOG4CPLUS_FATAL(log_,"xdaq::frameAlloc failed"); } // Fill in the fields of the fragment block / message stdMsg=(I2O_MESSAGE_FRAME*)bufRef->getDataLocation(); pvtMsg=(I2O_PRIVATE_MESSAGE_FRAME*)stdMsg; block =(I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME*)stdMsg; pvtMsg->XFunctionCode =I2O_FU_TAKE; pvtMsg->OrganizationID =XDAQ_ORGANIZATION_ID; stdMsg->MessageSize =(msgHeaderSize + payload) >> 2; stdMsg->Function =I2O_PRIVATE_MESSAGE; stdMsg->VersionOffset =0; stdMsg->MsgFlags =0; stdMsg->InitiatorAddress=i2o::utils::getAddressMap()->getTid(buAppDesc_); stdMsg->TargetAddress =i2o::utils::getAddressMap()->getTid(fuAppDesc_); block->buResourceId =evt->buResourceId(); block->fuTransactionId =fuResourceId; block->blockNb =iBlock; block->nbBlocksInSuperFragment=nBlock; block->superFragmentNb =iSuperFrag; block->nbSuperFragmentsInEvent=nSuperFrag; block->eventNumber =evt->evtNumber(); // Fill in payload startOfPayload =(unsigned char*)block+msgHeaderSize; frlh_t* frlHeader=(frlh_t*)startOfPayload; frlHeader->trigno=evt->evtNumber(); frlHeader->segno =iBlock; unsigned char *startOfFedBlocks=startOfPayload+frlHeaderSize_; payload -=frlHeaderSize_; frlHeader->segsize =payload; unsigned int leftspace=payload; // a fed trailer was left over from the previous block if(fedTrailerLeft) { memcpy(startOfFedBlocks, evt->fedAddr(iFed)+evt->fedSize(iFed)-fedTrailerSize_, fedTrailerSize_); startOfFedBlocks+=fedTrailerSize_; leftspace -=fedTrailerSize_; remainder =0; fedTrailerLeft =false; // if this is the last fed, adjust block (msg) size and set last=true if((iFed==nFed-1) && !last) { frlHeader->segsize-=leftspace; int msgSize=stdMsg->MessageSize << 2; msgSize -=leftspace; bufRef->setDataSize(msgSize); stdMsg->MessageSize = msgSize >> 2; frlHeader->segsize=frlHeader->segsize | FRL_LAST_SEGM; last=true; } // !! increment iFed !! iFed++; } if (remainder>0) { // the remaining fed fits entirely into the new block if(payload>=remainder) { memcpy(startOfFedBlocks, evt->fedAddr(iFed)+evt->fedSize(iFed)-remainder, remainder); startOfFedBlocks+=remainder; leftspace -=remainder; // if this is the last fed in the superfragment, earmark it if(iFed==nFed-1) { frlHeader->segsize-=leftspace; int msgSize=stdMsg->MessageSize << 2; msgSize -=leftspace; bufRef->setDataSize(msgSize); stdMsg->MessageSize = msgSize >> 2; frlHeader->segsize=frlHeader->segsize | FRL_LAST_SEGM; last=true; } // !! increment iFed !! iFed++; // start new fed -> set remainder to 0! remainder=0; } // the remaining payload fits, but not the fed trailer else if (payload>=(remainder-fedTrailerSize_)) { memcpy(startOfFedBlocks, evt->fedAddr(iFed)+evt->fedSize(iFed)-remainder, remainder-fedTrailerSize_); frlHeader->segsize=remainder-fedTrailerSize_; fedTrailerLeft =true; leftspace -=(remainder-fedTrailerSize_); remainder =fedTrailerSize_; } // the remaining payload fits only partially, fill whole block else { memcpy(startOfFedBlocks, evt->fedAddr(iFed)+evt->fedSize(iFed)-remainder,payload); remainder-=payload; leftspace =0; } } if(remainder==0) { // loop on feds while(iFed<nFed) { // if the next header does not fit, jump to following block if((int)leftspace<fedHeaderSize_) { frlHeader->segsize-=leftspace; break; } memcpy(startOfFedBlocks,evt->fedAddr(iFed),fedHeaderSize_); leftspace -=fedHeaderSize_; startOfFedBlocks+=fedHeaderSize_; // fed fits with its trailer if(evt->fedSize(iFed)-fedHeaderSize_<=leftspace) { memcpy(startOfFedBlocks, evt->fedAddr(iFed)+fedHeaderSize_, evt->fedSize(iFed)-fedHeaderSize_); leftspace -=(evt->fedSize(iFed)-fedHeaderSize_); startOfFedBlocks+=(evt->fedSize(iFed)-fedHeaderSize_); } // fed payload fits only without fed trailer else if(evt->fedSize(iFed)-fedHeaderSize_-fedTrailerSize_<=leftspace) { memcpy(startOfFedBlocks, evt->fedAddr(iFed)+fedHeaderSize_, evt->fedSize(iFed)-fedHeaderSize_-fedTrailerSize_); leftspace -=(evt->fedSize(iFed)-fedHeaderSize_-fedTrailerSize_); frlHeader->segsize-=leftspace; fedTrailerLeft =true; remainder =fedTrailerSize_; break; } // fed payload fits only partially else { memcpy(startOfFedBlocks,evt->fedAddr(iFed)+fedHeaderSize_,leftspace); remainder=evt->fedSize(iFed)-fedHeaderSize_-leftspace; leftspace=0; break; } // !! increase iFed !! iFed++; } // while (iFed<fedN_) // earmark the last block if (iFed==nFed && remainder==0 && !last) { frlHeader->segsize-=leftspace; int msgSize=stdMsg->MessageSize << 2; msgSize -=leftspace; bufRef->setDataSize(msgSize); stdMsg->MessageSize=msgSize >> 2; frlHeader->segsize =frlHeader->segsize | FRL_LAST_SEGM; last=true; } } // if (remainder==0) if(iSuperFrag==0&&iBlock==0) { // This is the first fragment block / message head=bufRef; tail=bufRef; } else { tail->setNextReference(bufRef); tail=bufRef; } if((iBlock==nBlock-1) && remainder!=0) { nBlock++; warning=true; } } // for (iBlock) // fix case where block estimate was wrong if(warning) { toolbox::mem::Reference* next=head; do { block =(I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME*)next->getDataLocation(); if (block->superFragmentNb==iSuperFrag) block->nbBlocksInSuperFragment=nBlock; } while((next=next->getNextReference())); } } // iSuperFrag < nSuperFrag return head; // return the top of the chain }
void BU::customWebPage | ( | xgi::Input * | in, |
xgi::Output * | out | ||
) | throw (xgi::exception::Exception) |
Definition at line 412 of file BU.cc.
References dbtoconf::out.
Referenced by BU().
{ *out<<"<html></html>"<<endl; }
double BU::deltaT | ( | const struct timeval * | start, |
const struct timeval * | end | ||
) | [private] |
Definition at line 727 of file BU.cc.
Referenced by monitoring().
void BU::dumpFrame | ( | unsigned char * | data, |
unsigned int | len | ||
) | [private] |
Definition at line 1144 of file BU.cc.
References trackerHits::c, i, and pos.
{ char left1[20]; char left2[20]; char right1[20]; char right2[20]; printf("Byte 0 1 2 3 4 5 6 7\n"); int c(0); int pos(0); for (unsigned int i=0;i<(len/8);i++) { int rpos(0); int off(3); for (pos=0;pos<12;pos+=3) { sprintf(&left1[pos],"%2.2x ", ((unsigned char*)data)[c+off]); sprintf(&right1[rpos],"%1c", ((data[c+off] > 32)&&(data[c+off] < 127)) ? data[c+off] : '.'); sprintf (&left2[pos],"%2.2x ", ((unsigned char*)data)[c+off+4]); sprintf (&right2[rpos],"%1c", ((data[c+off+4] > 32)&&(data[c+off+4]<127)) ? data[c+off+4] : '.'); rpos++; off--; } c+=8; printf ("%4d: %s%s || %s%s %p\n", c-8, left1, left2, right1, right2, &data[c-8]); } fflush(stdout); }
bool BU::enabling | ( | toolbox::task::WorkLoop * | wl | ) |
Definition at line 197 of file BU.cc.
References alignCSCRings::e, Exception, evf::StateMachine::fireEvent(), evf::StateMachine::fireFailed(), fsm_, i, FEDNumbering::inRange(), FEDNumbering::inRangeNoGT(), PlaybackRawDataProvider::instance(), isBuilding_, isHalting_, isSending_, log_, FEDNumbering::MAXFEDID, lumiQueryAPI::msg, startBuildingWorkLoop(), startSendingWorkLoop(), and validFedIds_.
{ isHalting_=false; try { LOG4CPLUS_INFO(log_,"Start enabling ..."); // determine valid fed ids (assumes Playback EP is already configured hence PBRDP::instance // not null in case we are playing back) if (0!=PlaybackRawDataProvider::instance()) { for (unsigned int i=0;i<(unsigned int)FEDNumbering::MAXFEDID+1;i++) if (FEDNumbering::inRange(i)) validFedIds_.push_back(i); } else{ for (unsigned int i=0;i<(unsigned int)FEDNumbering::MAXFEDID+1;i++) if (FEDNumbering::inRangeNoGT(i)) validFedIds_.push_back(i); } if (!isBuilding_) startBuildingWorkLoop(); if (!isSending_) startSendingWorkLoop(); LOG4CPLUS_INFO(log_,"Finished enabling!"); fsm_.fireEvent("EnableDone",this); } catch (xcept::Exception &e) { string msg = "enabling FAILED: " + (string)e.what(); fsm_.fireFailed(msg,this); } return false; }
void BU::exportParameters | ( | ) | [private] |
Definition at line 629 of file BU.cc.
References evf::WebGUI::addItemChangedListener(), evf::WebGUI::addMonitorCounter(), evf::WebGUI::addMonitorParam(), evf::WebGUI::addStandardParam(), average_, class_, crc_, deltaN_, deltaSumOfSizes_, deltaSumOfSquares_, deltaT_, eventBufferSize_, evf::WebGUI::exportParameters(), fakeLsUpdateSecs_, fedSizeMax_, fedSizeMean_, fedSizeWidth_, firstEvent_, evf::StateMachine::foundRcmsStateListener(), fsm_, gui_, hostname_, instance_, log_, memUsedInMB_, mode_, monSleepSec_, msgBufferSize_, nbEventsBuilt_, nbEventsDiscarded_, nbEventsInBU_, nbEventsRequested_, nbEventsSent_, overwriteEvtId_, overwriteLsId_, queueSize_, rate_, evf::StateMachine::rcmsStateListener(), replay_, rms_, runNumber_, evf::StateMachine::stateName(), throughput_, url_, and useFixedFedSize_.
Referenced by BU().
{ if (0==gui_) { LOG4CPLUS_ERROR(log_,"No GUI, can't export parameters"); return; } gui_->addMonitorParam("url", &url_); gui_->addMonitorParam("class", &class_); gui_->addMonitorParam("instance", &instance_); gui_->addMonitorParam("hostname", &hostname_); gui_->addMonitorParam("runNumber", &runNumber_); gui_->addMonitorParam("stateName", fsm_.stateName()); gui_->addMonitorParam("memUsedInMB", &memUsedInMB_); gui_->addMonitorParam("deltaT", &deltaT_); gui_->addMonitorParam("deltaN", &deltaN_); gui_->addMonitorParam("deltaSumOfSquares", &deltaSumOfSquares_); gui_->addMonitorParam("deltaSumOfSizes", &deltaSumOfSizes_); gui_->addMonitorParam("throughput", &throughput_); gui_->addMonitorParam("average", &average_); gui_->addMonitorParam("rate", &rate_); gui_->addMonitorParam("rms", &rms_); gui_->addMonitorCounter("nbEvtsInBU", &nbEventsInBU_); gui_->addMonitorCounter("nbEvtsRequested", &nbEventsRequested_); gui_->addMonitorCounter("nbEvtsBuilt", &nbEventsBuilt_); gui_->addMonitorCounter("nbEvtsSent", &nbEventsSent_); gui_->addMonitorCounter("nbEvtsDiscarded", &nbEventsDiscarded_); gui_->addStandardParam("mode", &mode_); gui_->addStandardParam("replay", &replay_); gui_->addStandardParam("overwriteEvtId", &overwriteEvtId_); gui_->addStandardParam("overwriteLsId", &overwriteLsId_); gui_->addStandardParam("fakeLsUpdateSecs", &fakeLsUpdateSecs_); gui_->addStandardParam("crc", &crc_); gui_->addStandardParam("firstEvent", &firstEvent_); gui_->addStandardParam("queueSize", &queueSize_); gui_->addStandardParam("eventBufferSize", &eventBufferSize_); gui_->addStandardParam("msgBufferSize", &msgBufferSize_); gui_->addStandardParam("fedSizeMax", &fedSizeMax_); gui_->addStandardParam("fedSizeMean", &fedSizeMean_); gui_->addStandardParam("fedSizeWidth", &fedSizeWidth_); gui_->addStandardParam("useFixedFedSize", &useFixedFedSize_); gui_->addStandardParam("monSleepSec", &monSleepSec_); gui_->addStandardParam("rcmsStateListener", fsm_.rcmsStateListener()); gui_->addStandardParam("foundRcmsStateListener",fsm_.foundRcmsStateListener()); gui_->exportParameters(); gui_->addItemChangedListener("crc",this); }
xoap::MessageReference BU::fsmCallback | ( | xoap::MessageReference | msg | ) | throw (xoap::exception::Exception) |
Definition at line 311 of file BU.cc.
References lumiQueryAPI::msg.
{ return fsm_.commandCallback(msg); }
bool BU::generateEvent | ( | evf::BUEvent * | evt | ) | [private] |
Definition at line 747 of file BU.cc.
References event(), fedh_struct::eventid, events_, evtNumber_, create_public_lumi_plots::exp, fedHeaderSize_, fedSizeMax_, fedSizeMean_, fedTrailerSize_, firstEvent_, gaussianMean_, gaussianWidth_, i, evf::BUEvent::initialize(), instance, PlaybackRawDataProvider::instance(), nbEventsBuilt_, overwriteEvtId_, replay_, convertSQLiteXML::runNumber, PlaybackRawDataProvider::setFreeToEof(), useFixedFedSize_, validFedIds_, evf::BUEvent::writeFed(), evf::BUEvent::writeFedHeader(), and evf::BUEvent::writeFedTrailer().
Referenced by building().
{ // replay? if (replay_.value_&&nbEventsBuilt_>=(uint32_t)events_.size()) { if (0!=PlaybackRawDataProvider::instance()) PlaybackRawDataProvider::instance()->setFreeToEof(); return true; } // PLAYBACK mode if (0!=PlaybackRawDataProvider::instance()) { unsigned int runNumber,evtNumber; FEDRawDataCollection* event= PlaybackRawDataProvider::instance()->getFEDRawData(runNumber,evtNumber); if(event == 0) return false; evt->initialize(evtNumber); for (unsigned int i=0;i<validFedIds_.size();i++) { unsigned int fedId =validFedIds_[i]; unsigned int fedSize=event->FEDData(fedId).size(); unsigned char* fedAddr=event->FEDData(fedId).data(); if (overwriteEvtId_.value_ && fedAddr != 0) { fedh_t *fedHeader=(fedh_t*)fedAddr; fedHeader->eventid=(fedHeader->eventid&0xFF000000)+(evtNumber&0x00FFFFFF); } if (fedSize>0) evt->writeFed(fedId,fedAddr,fedSize); } delete event; } // RANDOM mode else { unsigned int evtNumber=(firstEvent_+evtNumber_++)%0x1000000; evt->initialize(evtNumber); unsigned int fedSizeMin=fedHeaderSize_+fedTrailerSize_; for (unsigned int i=0;i<validFedIds_.size();i++) { unsigned int fedId(validFedIds_[i]); unsigned int fedSize(fedSizeMean_); if (!useFixedFedSize_) { double logFedSize=CLHEP::RandGauss::shoot(gaussianMean_,gaussianWidth_); fedSize=(unsigned int)(std::exp(logFedSize)); if (fedSize<fedSizeMin) fedSize=fedSizeMin; if (fedSize>fedSizeMax_) fedSize=fedSizeMax_; fedSize-=fedSize%8; } evt->writeFed(fedId,0,fedSize); evt->writeFedHeader(i); evt->writeFedTrailer(i); } } return true; }
bool BU::halting | ( | toolbox::task::WorkLoop * | wl | ) |
Definition at line 280 of file BU.cc.
References builtIds_, alignCSCRings::e, events_, Exception, evf::StateMachine::fireEvent(), evf::StateMachine::fireFailed(), freeIds_, fsm_, instance, PlaybackRawDataProvider::instance(), isBuilding_, isHalting_, isSending_, lock(), log_, lumiQueryAPI::msg, nbEventsBuilt_, postBuild(), postSend(), replay_, and unlock().
{ try { LOG4CPLUS_INFO(log_,"Start halting ..."); isHalting_=true; if (isBuilding_&&isSending_) { lock(); freeIds_.push(events_.size()); builtIds_.push(events_.size()); unlock(); postBuild(); postSend(); } if (0!=PlaybackRawDataProvider::instance()&& (!replay_.value_||nbEventsBuilt_<(uint32_t)events_.size())) { PlaybackRawDataProvider::instance()->setFreeToEof(); while (!PlaybackRawDataProvider::instance()->areFilesClosed()) usleep(1000000); usleep(100000); } LOG4CPLUS_INFO(log_,"Finished halting!"); fsm_.fireEvent("HaltDone",this); } catch (xcept::Exception &e) { string msg = "halting FAILED: " + (string)e.what(); fsm_.fireFailed(msg,this); } return false; }
void BU::I2O_BU_ALLOCATE_Callback | ( | toolbox::mem::Reference * | bufRef | ) |
Definition at line 319 of file BU.cc.
References fuAppDesc_, i, isHalting_, lock(), log_, lumiQueryAPI::msg, nbEventsInBU_, nbEventsRequested_, postRqst(), rqstIds_, and unlock().
Referenced by BU().
{ if (isHalting_) { LOG4CPLUS_WARN(log_,"Ignore BU_ALLOCATE message while halting."); bufRef->release(); return; } I2O_MESSAGE_FRAME *stdMsg; I2O_BU_ALLOCATE_MESSAGE_FRAME *msg; stdMsg=(I2O_MESSAGE_FRAME*)bufRef->getDataLocation(); msg =(I2O_BU_ALLOCATE_MESSAGE_FRAME*)stdMsg; if (0==fuAppDesc_) { I2O_TID fuTid=stdMsg->InitiatorAddress; fuAppDesc_=i2o::utils::getAddressMap()->getApplicationDescriptor(fuTid); } for (unsigned int i=0;i<msg->n;i++) { unsigned int fuResourceId=msg->allocate[i].fuTransactionId; lock(); rqstIds_.push(fuResourceId); postRqst(); nbEventsRequested_++; nbEventsInBU_++; unlock(); } bufRef->release(); }
void BU::I2O_BU_DISCARD_Callback | ( | toolbox::mem::Reference * | bufRef | ) |
Definition at line 353 of file BU.cc.
References freeIds_, isHalting_, lock(), log_, lumiQueryAPI::msg, nbEventsDiscarded_, postBuild(), query::result, sentIds_, and unlock().
Referenced by BU().
{ if (isHalting_) { LOG4CPLUS_WARN(log_,"Ignore BU_DISCARD message while halting."); bufRef->release(); return; } I2O_MESSAGE_FRAME *stdMsg=(I2O_MESSAGE_FRAME*)bufRef->getDataLocation(); I2O_BU_DISCARD_MESSAGE_FRAME*msg =(I2O_BU_DISCARD_MESSAGE_FRAME*)stdMsg; unsigned int buResourceId=msg->buResourceId[0]; lock(); int result=sentIds_.erase(buResourceId); unlock(); if (!result) { LOG4CPLUS_ERROR(log_,"can't discard unknown buResourceId '"<<buResourceId<<"'"); } else { lock(); freeIds_.push(buResourceId); nbEventsDiscarded_.value_++; unlock(); postBuild(); } bufRef->release(); }
void evf::BU::lock | ( | void | ) | [inline, private] |
Definition at line 110 of file BU.h.
References lock_.
Referenced by building(), halting(), I2O_BU_ALLOCATE_Callback(), I2O_BU_DISCARD_Callback(), monitoring(), sending(), and stopping().
{ sem_wait(&lock_); }
bool BU::monitoring | ( | toolbox::task::WorkLoop * | wl | ) |
Definition at line 562 of file BU.cc.
References average_, deltaN_, deltaSumOfSizes_, deltaSumOfSquares_, deltaT(), deltaT_, gui_, lock(), timingPdfMaker::mean, evf::WebGUI::monInfoSpace(), monLastN_, monLastSumOfSizes_, monLastSumOfSquares_, monSleepSec_, monStartTime_, nbEventsBuilt_, rate_, rms_, stor::utils::sleep(), mathSSE::sqrt(), sumOfSizes_, sumOfSquares_, throughput_, and unlock().
Referenced by startMonitoringWorkLoop().
{ struct timeval monEndTime; struct timezone timezone; gettimeofday(&monEndTime,&timezone); lock(); unsigned int monN =nbEventsBuilt_.value_; uint64_t monSumOfSquares=sumOfSquares_; unsigned int monSumOfSizes =sumOfSizes_; uint64_t deltaSumOfSquares; unlock(); gui_->monInfoSpace()->lock(); deltaT_.value_=deltaT(&monStartTime_,&monEndTime); monStartTime_=monEndTime; deltaN_.value_=monN-monLastN_; monLastN_=monN; deltaSumOfSquares=monSumOfSquares-monLastSumOfSquares_; deltaSumOfSquares_.value_=(double)deltaSumOfSquares; monLastSumOfSquares_=monSumOfSquares; deltaSumOfSizes_.value_=monSumOfSizes-monLastSumOfSizes_; monLastSumOfSizes_=monSumOfSizes; if (deltaT_.value_!=0) { throughput_=deltaSumOfSizes_.value_/deltaT_.value_; rate_ =deltaN_.value_/deltaT_.value_; } else { throughput_=0.0; rate_ =0.0; } double meanOfSquares,mean,squareOfMean,variance; if(deltaN_.value_!=0) { meanOfSquares=deltaSumOfSquares_.value_/((double)(deltaN_.value_)); mean=((double)(deltaSumOfSizes_.value_))/((double)(deltaN_.value_)); squareOfMean=mean*mean; variance=meanOfSquares-squareOfMean; average_=deltaSumOfSizes_.value_/deltaN_.value_; rms_ =std::sqrt(variance); } else { average_=0.0; rms_ =0.0; } gui_->monInfoSpace()->unlock(); ::sleep(monSleepSec_.value_); return true; }
void evf::BU::postBuild | ( | ) | [inline, private] |
Definition at line 113 of file BU.h.
References buildSem_.
Referenced by halting(), I2O_BU_DISCARD_Callback(), and stopping().
{ sem_post(&buildSem_); }
void evf::BU::postRqst | ( | ) | [inline, private] |
Definition at line 117 of file BU.h.
References rqstSem_.
Referenced by I2O_BU_ALLOCATE_Callback().
{ sem_post(&rqstSem_); }
void evf::BU::postSend | ( | ) | [inline, private] |
Definition at line 115 of file BU.h.
References sendSem_.
Referenced by building(), halting(), and stopping().
{ sem_post(&sendSem_); }
void BU::reset | ( | void | ) | [private] |
Definition at line 685 of file BU.cc.
References average_, buildSem_, builtIds_, deltaN_, deltaSumOfSizes_, deltaSumOfSquares_, deltaT_, eventBufferSize_, events_, fakeLs_, freeIds_, gui_, i, lock_, monLastN_, monLastSumOfSizes_, monLastSumOfSquares_, queueSize_, rate_, evf::WebGUI::resetCounters(), rms_, rqstIds_, rqstSem_, sendSem_, sentIds_, throughput_, and validFedIds_.
Referenced by configuring(), and stopping().
{ gui_->resetCounters(); deltaT_ =0.0; deltaN_ = 0; deltaSumOfSquares_ = 0; deltaSumOfSizes_ = 0; throughput_ =0.0; average_ = 0; rate_ = 0; rms_ = 0; monLastN_ = 0; monLastSumOfSquares_= 0; monLastSumOfSizes_ = 0; while (events_.size()) { delete events_.back(); events_.pop_back(); } while (!rqstIds_.empty()) rqstIds_.pop(); while (!freeIds_.empty()) freeIds_.pop(); while (!builtIds_.empty()) builtIds_.pop(); sentIds_.clear(); sem_init(&lock_,0,1); sem_init(&buildSem_,0,queueSize_); sem_init(&sendSem_,0,0); sem_init(&rqstSem_,0,0); for (unsigned int i=0;i<queueSize_;i++) { events_.push_back(new BUEvent(i,eventBufferSize_)); freeIds_.push(i); } validFedIds_.clear(); fakeLs_=0; }
bool BU::sending | ( | toolbox::task::WorkLoop * | wl | ) |
Definition at line 501 of file BU.cc.
References buAppContext_, buAppDesc_, builtIds_, createMsgChain(), events_, evf::BUEvent::evtSize(), fuAppDesc_, isHalting_, isSending_, lock(), log_, lumiQueryAPI::msg, nbEventsInBU_, nbEventsSent_, rqstIds_, sentIds_, sumOfSizes_, sumOfSquares_, unlock(), waitRqst(), and waitSend().
Referenced by startSendingWorkLoop().
{ waitSend(); lock(); unsigned int buResourceId=builtIds_.front(); builtIds_.pop(); unlock(); if (buResourceId>=(uint32_t)events_.size()) { LOG4CPLUS_INFO(log_,"shutdown 'sending' workloop."); isSending_=false; return false; } if (!isHalting_) { waitRqst(); lock(); unsigned int fuResourceId=rqstIds_.front(); rqstIds_.pop(); unlock(); BUEvent* evt=events_[buResourceId]; toolbox::mem::Reference* msg=createMsgChain(evt,fuResourceId); lock(); sumOfSquares_+=(uint64_t)evt->evtSize()*(uint64_t)evt->evtSize(); sumOfSizes_ +=evt->evtSize(); nbEventsInBU_--; nbEventsSent_++; sentIds_.insert(buResourceId); unlock(); buAppContext_->postFrame(msg,buAppDesc_,fuAppDesc_); } return true; }
void BU::startBuildingWorkLoop | ( | ) | throw (evf::Exception) |
Definition at line 420 of file BU.cc.
References asBuilding_, building(), alignCSCRings::e, Exception, isBuilding_, log_, lumiQueryAPI::msg, sourceId_, and wlBuilding_.
Referenced by enabling().
{ try { LOG4CPLUS_INFO(log_,"Start 'building' workloop"); wlBuilding_= toolbox::task::getWorkLoopFactory()->getWorkLoop(sourceId_+ "Building", "waiting"); if (!wlBuilding_->isActive()) wlBuilding_->activate(); asBuilding_=toolbox::task::bind(this,&BU::building,sourceId_+"Building"); wlBuilding_->submit(asBuilding_); isBuilding_=true; } catch (xcept::Exception& e) { string msg = "Failed to start workloop 'building'."; XCEPT_RETHROW(evf::Exception,msg,e); } }
void BU::startMonitoringWorkLoop | ( | ) | throw (evf::Exception) |
Definition at line 539 of file BU.cc.
References asMonitoring_, alignCSCRings::e, Exception, log_, monitoring(), monStartTime_, lumiQueryAPI::msg, sourceId_, and wlMonitoring_.
Referenced by BU().
{ struct timezone timezone; gettimeofday(&monStartTime_,&timezone); try { LOG4CPLUS_INFO(log_,"Start 'monitoring' workloop"); wlMonitoring_= toolbox::task::getWorkLoopFactory()->getWorkLoop(sourceId_+ "Monitoring", "waiting"); if (!wlMonitoring_->isActive()) wlMonitoring_->activate(); asMonitoring_=toolbox::task::bind(this,&BU::monitoring,sourceId_+"Monitoring"); wlMonitoring_->submit(asMonitoring_); } catch (xcept::Exception& e) { string msg = "Failed to start workloop 'monitoring'."; XCEPT_RETHROW(evf::Exception,msg,e); } }
void BU::startSendingWorkLoop | ( | ) | throw (evf::Exception) |
Definition at line 480 of file BU.cc.
References asSending_, alignCSCRings::e, Exception, isSending_, log_, lumiQueryAPI::msg, sending(), sourceId_, and wlSending_.
Referenced by enabling().
{ try { LOG4CPLUS_INFO(log_,"Start 'sending' workloop"); wlSending_=toolbox::task::getWorkLoopFactory()->getWorkLoop(sourceId_+ "Sending", "waiting"); if (!wlSending_->isActive()) wlSending_->activate(); asSending_=toolbox::task::bind(this,&BU::sending,sourceId_+"Sending"); wlSending_->submit(asSending_); isSending_=true; } catch (xcept::Exception& e) { string msg = "Failed to start workloop 'sending'."; XCEPT_RETHROW(evf::Exception,msg,e); } }
bool BU::stopping | ( | toolbox::task::WorkLoop * | wl | ) |
Definition at line 227 of file BU.cc.
References builtIds_, alignCSCRings::e, events_, Exception, evf::StateMachine::fireEvent(), evf::StateMachine::fireFailed(), freeIds_, fsm_, instance, PlaybackRawDataProvider::instance(), lock(), log_, lumiQueryAPI::msg, postBuild(), postSend(), reset(), sentIds_, stor::utils::sleep(), and unlock().
{ try { LOG4CPLUS_INFO(log_,"Start stopping :) ..."); if (0!=PlaybackRawDataProvider::instance()) { /*&& (!replay_.value_||nbEventsBuilt_<(uint32_t)events_.size())) { */ lock(); freeIds_.push(events_.size()); unlock(); postBuild(); while (!builtIds_.empty()) { LOG4CPLUS_INFO(log_,"wait to flush ... #builtIds="<<builtIds_.size()); ::sleep(1); } // let the playback go to the last event and exit PlaybackRawDataProvider::instance()->setFreeToEof(); while (!PlaybackRawDataProvider::instance()->areFilesClosed()) usleep(1000000); usleep(100000); } lock(); builtIds_.push(events_.size()); unlock(); postSend(); while (!sentIds_.empty()) { LOG4CPLUS_INFO(log_,"wait to flush ..."); ::sleep(1); } reset(); //postBuild(); /* this is not needed and should not run if reset is called if (0!=PlaybackRawDataProvider::instance()&& (replay_.value_&&nbEventsBuilt_>=(uint32_t)events_.size())) { lock(); freeIds_.push(events_.size()); unlock(); postBuild(); } */ LOG4CPLUS_INFO(log_,"Finished stopping!"); fsm_.fireEvent("StopDone",this); } catch (xcept::Exception &e) { string msg = "stopping FAILED: " + (string)e.what(); fsm_.fireFailed(msg,this); } return false; }
void evf::BU::unlock | ( | void | ) | [inline, private] |
Definition at line 111 of file BU.h.
References lock_.
Referenced by building(), halting(), I2O_BU_ALLOCATE_Callback(), I2O_BU_DISCARD_Callback(), monitoring(), sending(), and stopping().
{ sem_post(&lock_); }
void evf::BU::waitBuild | ( | ) | [inline, private] |
Definition at line 112 of file BU.h.
References buildSem_.
Referenced by building().
{ sem_wait(&buildSem_); }
void evf::BU::waitRqst | ( | ) | [inline, private] |
void evf::BU::waitSend | ( | ) | [inline, private] |
void BU::webPageRequest | ( | xgi::Input * | in, |
xgi::Output * | out | ||
) | throw (xgi::exception::Exception) |
Definition at line 402 of file BU.cc.
References recoMuon::in, mergeVDriftHistosByStation::name, and dbtoconf::out.
Referenced by BU().
evf::BU::XDAQ_INSTANTIATOR | ( | ) |
toolbox::task::ActionSignature* evf::BU::asBuilding_ [private] |
Definition at line 169 of file BU.h.
Referenced by startBuildingWorkLoop().
toolbox::task::ActionSignature* evf::BU::asMonitoring_ [private] |
Definition at line 177 of file BU.h.
Referenced by startMonitoringWorkLoop().
toolbox::task::ActionSignature* evf::BU::asSending_ [private] |
Definition at line 173 of file BU.h.
Referenced by startSendingWorkLoop().
xdata::Double evf::BU::average_ [private] |
Definition at line 196 of file BU.h.
Referenced by exportParameters(), monitoring(), and reset().
xdaq::ApplicationContext* evf::BU::buAppContext_ [private] |
xdaq::ApplicationDescriptor* evf::BU::buAppDesc_ [private] |
Definition at line 140 of file BU.h.
Referenced by createMsgChain(), and sending().
sem_t evf::BU::buildSem_ [private] |
Definition at line 244 of file BU.h.
Referenced by postBuild(), reset(), and waitBuild().
std::queue<unsigned int> evf::BU::builtIds_ [private] |
Definition at line 158 of file BU.h.
Referenced by building(), halting(), reset(), sending(), and stopping().
xdata::String evf::BU::class_ [private] |
Definition at line 184 of file BU.h.
Referenced by BU(), and exportParameters().
xdata::Boolean evf::BU::crc_ [private] |
Definition at line 210 of file BU.h.
Referenced by actionPerformed(), BU(), and exportParameters().
xdata::UnsignedInteger32 evf::BU::deltaN_ [private] |
Definition at line 191 of file BU.h.
Referenced by exportParameters(), monitoring(), and reset().
xdata::UnsignedInteger32 evf::BU::deltaSumOfSizes_ [private] |
Definition at line 193 of file BU.h.
Referenced by exportParameters(), monitoring(), and reset().
xdata::Double evf::BU::deltaSumOfSquares_ [private] |
Definition at line 192 of file BU.h.
Referenced by exportParameters(), monitoring(), and reset().
xdata::Double evf::BU::deltaT_ [private] |
Definition at line 190 of file BU.h.
Referenced by exportParameters(), monitoring(), and reset().
xdata::UnsignedInteger32 evf::BU::eventBufferSize_ [private] |
Definition at line 216 of file BU.h.
Referenced by exportParameters(), and reset().
std::vector<evf::BUEvent*> evf::BU::events_ [private] |
Definition at line 155 of file BU.h.
Referenced by building(), generateEvent(), halting(), reset(), sending(), stopping(), and ~BU().
unsigned int evf::BU::evtNumber_ [private] |
Definition at line 160 of file BU.h.
Referenced by generateEvent().
unsigned int evf::BU::fakeLs_ [private] |
Definition at line 224 of file BU.h.
Referenced by createMsgChain(), and reset().
xdata::UnsignedInteger32 evf::BU::fakeLsUpdateSecs_ [private] |
Definition at line 213 of file BU.h.
Referenced by createMsgChain(), and exportParameters().
const int evf::BU::fedHeaderSize_ = sizeof(fedh_t) [static, private] |
Definition at line 253 of file BU.h.
Referenced by createMsgChain(), and generateEvent().
xdata::UnsignedInteger32 evf::BU::fedSizeMax_ [private] |
Definition at line 218 of file BU.h.
Referenced by exportParameters(), and generateEvent().
xdata::UnsignedInteger32 evf::BU::fedSizeMean_ [private] |
Definition at line 219 of file BU.h.
Referenced by BU(), exportParameters(), and generateEvent().
xdata::UnsignedInteger32 evf::BU::fedSizeWidth_ [private] |
Definition at line 220 of file BU.h.
Referenced by BU(), and exportParameters().
const int evf::BU::fedTrailerSize_ = sizeof(fedt_t) [static, private] |
Definition at line 254 of file BU.h.
Referenced by createMsgChain(), and generateEvent().
xdata::UnsignedInteger32 evf::BU::firstEvent_ [private] |
Definition at line 214 of file BU.h.
Referenced by exportParameters(), and generateEvent().
std::queue<unsigned int> evf::BU::freeIds_ [private] |
Definition at line 157 of file BU.h.
Referenced by building(), halting(), I2O_BU_DISCARD_Callback(), reset(), and stopping().
const int evf::BU::frlHeaderSize_ = sizeof(frlh_t) [static, private] |
Definition at line 252 of file BU.h.
Referenced by createMsgChain().
StateMachine evf::BU::fsm_ [private] |
Definition at line 149 of file BU.h.
Referenced by BU(), configuring(), enabling(), exportParameters(), halting(), and stopping().
xdaq::ApplicationDescriptor* evf::BU::fuAppDesc_ [private] |
Definition at line 143 of file BU.h.
Referenced by createMsgChain(), I2O_BU_ALLOCATE_Callback(), and sending().
double evf::BU::gaussianMean_ [private] |
Definition at line 227 of file BU.h.
Referenced by BU(), and generateEvent().
double evf::BU::gaussianWidth_ [private] |
Definition at line 228 of file BU.h.
Referenced by BU(), and generateEvent().
WebGUI* evf::BU::gui_ [private] |
Definition at line 152 of file BU.h.
Referenced by actionPerformed(), BU(), exportParameters(), monitoring(), and reset().
xdata::String evf::BU::hostname_ [private] |
Definition at line 186 of file BU.h.
Referenced by BU(), and exportParameters().
toolbox::mem::Pool* evf::BU::i2oPool_ [private] |
Definition at line 240 of file BU.h.
Referenced by actionPerformed(), BU(), and createMsgChain().
xdata::UnsignedInteger32 evf::BU::instance_ [private] |
Definition at line 185 of file BU.h.
Referenced by BU(), and exportParameters().
bool evf::BU::isBuilding_ [private] |
Definition at line 163 of file BU.h.
Referenced by building(), enabling(), halting(), and startBuildingWorkLoop().
bool evf::BU::isHalting_ [private] |
Definition at line 165 of file BU.h.
Referenced by building(), configuring(), enabling(), halting(), I2O_BU_ALLOCATE_Callback(), I2O_BU_DISCARD_Callback(), and sending().
bool evf::BU::isSending_ [private] |
Definition at line 164 of file BU.h.
Referenced by enabling(), halting(), sending(), and startSendingWorkLoop().
timeval evf::BU::lastLsUpdate_ [private] |
Definition at line 225 of file BU.h.
Referenced by createMsgChain().
sem_t evf::BU::lock_ [private] |
Logger evf::BU::log_ [private] |
Definition at line 137 of file BU.h.
Referenced by BU(), building(), configuring(), createMsgChain(), enabling(), exportParameters(), halting(), I2O_BU_ALLOCATE_Callback(), I2O_BU_DISCARD_Callback(), sending(), startBuildingWorkLoop(), startMonitoringWorkLoop(), startSendingWorkLoop(), and stopping().
xdata::Double evf::BU::memUsedInMB_ [private] |
Definition at line 188 of file BU.h.
Referenced by actionPerformed(), and exportParameters().
xdata::String evf::BU::mode_ [private] |
Definition at line 208 of file BU.h.
Referenced by actionPerformed(), and exportParameters().
unsigned int evf::BU::monLastN_ [private] |
Definition at line 232 of file BU.h.
Referenced by monitoring(), and reset().
unsigned int evf::BU::monLastSumOfSizes_ [private] |
Definition at line 234 of file BU.h.
Referenced by monitoring(), and reset().
uint64_t evf::BU::monLastSumOfSquares_ [private] |
Definition at line 233 of file BU.h.
Referenced by monitoring(), and reset().
xdata::UnsignedInteger32 evf::BU::monSleepSec_ [private] |
Definition at line 222 of file BU.h.
Referenced by exportParameters(), and monitoring().
struct timeval evf::BU::monStartTime_ [private] |
Definition at line 231 of file BU.h.
Referenced by monitoring(), and startMonitoringWorkLoop().
xdata::UnsignedInteger32 evf::BU::msgBufferSize_ [private] |
Definition at line 217 of file BU.h.
Referenced by createMsgChain(), and exportParameters().
xdata::UnsignedInteger32 evf::BU::nbEventsBuilt_ [private] |
Definition at line 203 of file BU.h.
Referenced by building(), exportParameters(), generateEvent(), halting(), and monitoring().
xdata::UnsignedInteger32 evf::BU::nbEventsDiscarded_ [private] |
Definition at line 205 of file BU.h.
Referenced by exportParameters(), and I2O_BU_DISCARD_Callback().
xdata::UnsignedInteger32 evf::BU::nbEventsInBU_ [private] |
Definition at line 201 of file BU.h.
Referenced by exportParameters(), I2O_BU_ALLOCATE_Callback(), and sending().
xdata::UnsignedInteger32 evf::BU::nbEventsRequested_ [private] |
Definition at line 202 of file BU.h.
Referenced by exportParameters(), and I2O_BU_ALLOCATE_Callback().
xdata::UnsignedInteger32 evf::BU::nbEventsSent_ [private] |
Definition at line 204 of file BU.h.
Referenced by exportParameters(), and sending().
xdata::Boolean evf::BU::overwriteEvtId_ [private] |
Definition at line 211 of file BU.h.
Referenced by exportParameters(), and generateEvent().
xdata::Boolean evf::BU::overwriteLsId_ [private] |
Definition at line 212 of file BU.h.
Referenced by createMsgChain(), and exportParameters().
xdata::UnsignedInteger32 evf::BU::queueSize_ [private] |
Definition at line 215 of file BU.h.
Referenced by exportParameters(), and reset().
xdata::Double evf::BU::rate_ [private] |
Definition at line 197 of file BU.h.
Referenced by exportParameters(), monitoring(), and reset().
xdata::Boolean evf::BU::replay_ [private] |
Definition at line 209 of file BU.h.
Referenced by exportParameters(), generateEvent(), and halting().
xdata::Double evf::BU::rms_ [private] |
Definition at line 198 of file BU.h.
Referenced by exportParameters(), monitoring(), and reset().
std::queue<unsigned int> evf::BU::rqstIds_ [private] |
Definition at line 156 of file BU.h.
Referenced by I2O_BU_ALLOCATE_Callback(), reset(), and sending().
sem_t evf::BU::rqstSem_ [private] |
Definition at line 246 of file BU.h.
Referenced by postRqst(), reset(), and waitRqst().
xdata::UnsignedInteger32 evf::BU::runNumber_ [private] |
Definition at line 187 of file BU.h.
Referenced by exportParameters().
sem_t evf::BU::sendSem_ [private] |
Definition at line 245 of file BU.h.
Referenced by postSend(), reset(), and waitSend().
std::set<unsigned int> evf::BU::sentIds_ [private] |
Definition at line 159 of file BU.h.
Referenced by I2O_BU_DISCARD_Callback(), reset(), sending(), and stopping().
std::string evf::BU::sourceId_ [private] |
Definition at line 180 of file BU.h.
Referenced by BU(), startBuildingWorkLoop(), startMonitoringWorkLoop(), and startSendingWorkLoop().
unsigned int evf::BU::sumOfSizes_ [private] |
Definition at line 236 of file BU.h.
Referenced by monitoring(), and sending().
uint64_t evf::BU::sumOfSquares_ [private] |
Definition at line 235 of file BU.h.
Referenced by monitoring(), and sending().
xdata::Double evf::BU::throughput_ [private] |
Definition at line 195 of file BU.h.
Referenced by exportParameters(), monitoring(), and reset().
xdata::String evf::BU::url_ [private] |
Definition at line 183 of file BU.h.
Referenced by BU(), and exportParameters().
xdata::Boolean evf::BU::useFixedFedSize_ [private] |
Definition at line 221 of file BU.h.
Referenced by exportParameters(), and generateEvent().
std::vector<unsigned int> evf::BU::validFedIds_ [private] |
Definition at line 161 of file BU.h.
Referenced by createMsgChain(), enabling(), generateEvent(), and reset().
toolbox::task::WorkLoop* evf::BU::wlBuilding_ [private] |
Definition at line 168 of file BU.h.
Referenced by startBuildingWorkLoop().
toolbox::task::WorkLoop* evf::BU::wlMonitoring_ [private] |
Definition at line 176 of file BU.h.
Referenced by startMonitoringWorkLoop().
toolbox::task::WorkLoop* evf::BU::wlSending_ [private] |
Definition at line 172 of file BU.h.
Referenced by startSendingWorkLoop().