CMS 3D CMS Logo

Public Member Functions | Private Member Functions | Private Attributes | Static Private Attributes

evf::BU Class Reference

#include <BU.h>

List of all members.

Public Member Functions

void actionPerformed (xdata::Event &e)
 BU (xdaq::ApplicationStub *s)
bool building (toolbox::task::WorkLoop *wl)
bool configuring (toolbox::task::WorkLoop *wl)
void customWebPage (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception)
bool enabling (toolbox::task::WorkLoop *wl)
xoap::MessageReference fsmCallback (xoap::MessageReference msg) throw (xoap::exception::Exception)
bool halting (toolbox::task::WorkLoop *wl)
void I2O_BU_ALLOCATE_Callback (toolbox::mem::Reference *bufRef)
void I2O_BU_DISCARD_Callback (toolbox::mem::Reference *bufRef)
bool monitoring (toolbox::task::WorkLoop *wl)
bool sending (toolbox::task::WorkLoop *wl)
void startBuildingWorkLoop () throw (evf::Exception)
void startMonitoringWorkLoop () throw (evf::Exception)
void startSendingWorkLoop () throw (evf::Exception)
bool stopping (toolbox::task::WorkLoop *wl)
void webPageRequest (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception)
 XDAQ_INSTANTIATOR ()
virtual ~BU ()

Private Member Functions

toolbox::mem::Reference * createMsgChain (evf::BUEvent *evt, unsigned int fuResourceId)
double deltaT (const struct timeval *start, const struct timeval *end)
void dumpFrame (unsigned char *data, unsigned int len)
void exportParameters ()
bool generateEvent (evf::BUEvent *evt)
void lock ()
void postBuild ()
void postRqst ()
void postSend ()
void reset ()
void unlock ()
void waitBuild ()
void waitRqst ()
void waitSend ()

Private Attributes

toolbox::task::ActionSignature * asBuilding_
toolbox::task::ActionSignature * asMonitoring_
toolbox::task::ActionSignature * asSending_
xdata::Double average_
xdaq::ApplicationContext * buAppContext_
xdaq::ApplicationDescriptor * buAppDesc_
sem_t buildSem_
std::queue< unsigned int > builtIds_
xdata::String class_
xdata::Boolean crc_
xdata::UnsignedInteger32 deltaN_
xdata::UnsignedInteger32 deltaSumOfSizes_
xdata::Double deltaSumOfSquares_
xdata::Double deltaT_
xdata::UnsignedInteger32 eventBufferSize_
std::vector< evf::BUEvent * > events_
unsigned int evtNumber_
unsigned int fakeLs_
xdata::UnsignedInteger32 fakeLsUpdateSecs_
xdata::UnsignedInteger32 fedSizeMax_
xdata::UnsignedInteger32 fedSizeMean_
xdata::UnsignedInteger32 fedSizeWidth_
xdata::UnsignedInteger32 firstEvent_
std::queue< unsigned int > freeIds_
StateMachine fsm_
xdaq::ApplicationDescriptor * fuAppDesc_
double gaussianMean_
double gaussianWidth_
WebGUIgui_
xdata::String hostname_
toolbox::mem::Pool * i2oPool_
xdata::UnsignedInteger32 instance_
bool isBuilding_
bool isHalting_
bool isSending_
timeval lastLsUpdate_
sem_t lock_
Logger log_
xdata::Double memUsedInMB_
xdata::String mode_
unsigned int monLastN_
unsigned int monLastSumOfSizes_
uint64_t monLastSumOfSquares_
xdata::UnsignedInteger32 monSleepSec_
struct timeval monStartTime_
xdata::UnsignedInteger32 msgBufferSize_
xdata::UnsignedInteger32 nbEventsBuilt_
xdata::UnsignedInteger32 nbEventsDiscarded_
xdata::UnsignedInteger32 nbEventsInBU_
xdata::UnsignedInteger32 nbEventsRequested_
xdata::UnsignedInteger32 nbEventsSent_
xdata::Boolean overwriteEvtId_
xdata::Boolean overwriteLsId_
xdata::UnsignedInteger32 queueSize_
xdata::Double rate_
xdata::Boolean replay_
xdata::Double rms_
std::queue< unsigned int > rqstIds_
sem_t rqstSem_
xdata::UnsignedInteger32 runNumber_
sem_t sendSem_
std::set< unsigned int > sentIds_
std::string sourceId_
unsigned int sumOfSizes_
uint64_t sumOfSquares_
xdata::Double throughput_
xdata::String url_
xdata::Boolean useFixedFedSize_
std::vector< unsigned int > validFedIds_
toolbox::task::WorkLoop * wlBuilding_
toolbox::task::WorkLoop * wlMonitoring_
toolbox::task::WorkLoop * wlSending_

Static Private Attributes

static const int fedHeaderSize_ = sizeof(fedh_t)
static const int fedTrailerSize_ = sizeof(fedt_t)
static const int frlHeaderSize_ = sizeof(frlh_t)

Detailed Description

Definition at line 49 of file BU.h.


Constructor & Destructor Documentation

BU::BU ( xdaq::ApplicationStub *  s)

Definition at line 36 of file BU.cc.

References class_, crc_, customWebPage(), alignCSCRings::e, Exception, exportParameters(), fedSizeMean_, fedSizeWidth_, evf::StateMachine::findRcmsStateListener(), fsm_, gaussianMean_, gaussianWidth_, gui_, hostname_, I2O_BU_ALLOCATE_Callback(), I2O_BU_DISCARD_Callback(), i2oPool_, evf::StateMachine::initialize(), instance_, funct::log(), log_, mergeVDriftHistosByStation::name, evf::BUEvent::setComputeCrc(), evf::WebGUI::setLargeAppIcon(), evf::WebGUI::setSmallAppIcon(), sourceId_, mathSSE::sqrt(), startMonitoringWorkLoop(), url_, and webPageRequest().

  : xdaq::Application(s)
  , log_(getApplicationLogger())
  , buAppDesc_(getApplicationDescriptor())
  , fuAppDesc_(0)
  , buAppContext_(getApplicationContext())
  , fsm_(this)
  , gui_(0)
  , evtNumber_(0)
  , isBuilding_(false)
  , isSending_(false)
  , isHalting_(false)
  , wlBuilding_(0)
  , asBuilding_(0)
  , wlSending_(0)
  , asSending_(0)
  , wlMonitoring_(0)
  , asMonitoring_(0)
  , instance_(0)
  , runNumber_(0)
  , memUsedInMB_(0.0)
  , deltaT_(0.0)
  , deltaN_(0)
  , deltaSumOfSquares_(0)
  , deltaSumOfSizes_(0)
  , throughput_(0.0)
  , average_(0.0)
  , rate_(0.0)
  , rms_(0.0)
  , nbEventsInBU_(0)
  , nbEventsRequested_(0)
  , nbEventsBuilt_(0)
  , nbEventsSent_(0)
  , nbEventsDiscarded_(0)
  , mode_("RANDOM")
  , replay_(false)
  , crc_(true)
  , overwriteEvtId_(true)
  , overwriteLsId_(false)
  , fakeLsUpdateSecs_(23)
  , firstEvent_(1)
  , queueSize_(32)
  , eventBufferSize_(0x400000)
  , msgBufferSize_(32768)
  , fedSizeMax_(65536)
  , fedSizeMean_(1024)
  , fedSizeWidth_(1024)
  , useFixedFedSize_(false)
  , monSleepSec_(1)
  , fakeLs_(0)
  , gaussianMean_(0.0)
  , gaussianWidth_(1.0)
  , monLastN_(0)
  , monLastSumOfSquares_(0)
  , monLastSumOfSizes_(0)
  , sumOfSquares_(0)
  , sumOfSizes_(0)
  , i2oPool_(0)
{
  // initialize state machine
  fsm_.initialize<evf::BU>(this);
  
  // initialize application info
  url_     =
    getApplicationDescriptor()->getContextDescriptor()->getURL()+"/"+
    getApplicationDescriptor()->getURN();
  class_   =getApplicationDescriptor()->getClassName();
  instance_=getApplicationDescriptor()->getInstance();
  hostname_=getApplicationDescriptor()->getContextDescriptor()->getURL();
  sourceId_=class_.toString()+instance_.toString();
  
  // i2o callbacks
  i2o::bind(this,&BU::I2O_BU_ALLOCATE_Callback,I2O_BU_ALLOCATE,XDAQ_ORGANIZATION_ID);
  i2o::bind(this,&BU::I2O_BU_DISCARD_Callback, I2O_BU_DISCARD, XDAQ_ORGANIZATION_ID);
  
  // allocate i2o memery pool
  string i2oPoolName=sourceId_+"_i2oPool";
  try {
    toolbox::mem::HeapAllocator *allocator=new toolbox::mem::HeapAllocator();
    toolbox::net::URN urn("toolbox-mem-pool",i2oPoolName);
    toolbox::mem::MemoryPoolFactory* poolFactory=
      toolbox::mem::getMemoryPoolFactory();
    i2oPool_=poolFactory->createPool(urn,allocator);
  }
  catch (toolbox::mem::exception::Exception& e) {
    string s="Failed to create pool: "+i2oPoolName;
    LOG4CPLUS_FATAL(log_,s);
    XCEPT_RETHROW(xcept::Exception,s,e);
  }
  
  // web interface
  xgi::bind(this,&evf::BU::webPageRequest,"Default");
  gui_=new WebGUI(this,&fsm_);
  gui_->setSmallAppIcon("/rubuilder/bu/images/bu32x32.gif");
  gui_->setLargeAppIcon("/rubuilder/bu/images/bu64x64.gif");
  
  vector<toolbox::lang::Method*> methods=gui_->getMethods();
  vector<toolbox::lang::Method*>::iterator it;
  for (it=methods.begin();it!=methods.end();++it) {
    if ((*it)->type()=="cgi") {
      string name=static_cast<xgi::MethodSignature*>(*it)->name();
      xgi::bind(this,&evf::BU::webPageRequest,name);
    }
  }
  xgi::bind(this,&evf::BU::customWebPage,"customWebPage");
  
  
  // export parameters to info space(s)
  exportParameters();

  // findRcmsStateListener
  fsm_.findRcmsStateListener();
  
  // compute parameters for fed size generation (a la Emilio)
  gaussianMean_ =std::log((double)fedSizeMean_);
  gaussianWidth_=std::sqrt(std::log
                           (0.5*
                            (1+std::sqrt
                             (1.0+4.0*
                              fedSizeWidth_.value_*fedSizeWidth_.value_/
                              fedSizeMean_.value_/fedSizeMean_.value_))));

  // start monitoring thread, once and for all
  startMonitoringWorkLoop();
  
  // propagate crc flag to BUEvent
  BUEvent::setComputeCrc(crc_.value_);
}
BU::~BU ( ) [virtual]

Definition at line 167 of file BU.cc.

References events_.

{
  while (!events_.empty()) { delete events_.back(); events_.pop_back(); }
}

Member Function Documentation

void BU::actionPerformed ( xdata::Event &  e)

Definition at line 385 of file BU.cc.

References crc_, alignCSCRings::e, gui_, i2oPool_, instance, memUsedInMB_, mode_, evf::WebGUI::monInfoSpace(), and evf::BUEvent::setComputeCrc().

{
  gui_->monInfoSpace()->lock();
  if (e.type()=="urn:xdata-event:ItemGroupRetrieveEvent") {
    mode_=(0==PlaybackRawDataProvider::instance())?"RANDOM":"PLAYBACK";
    if (0!=i2oPool_) memUsedInMB_=i2oPool_->getMemoryUsage().getUsed()*9.53674e-07;
    else             memUsedInMB_=0.0;
  }
  else if (e.type()=="ItemChangedEvent") {
    string item=dynamic_cast<xdata::ItemChangedEvent&>(e).itemName();
    if (item=="crc") BUEvent::setComputeCrc(crc_.value_);
  }
  gui_->monInfoSpace()->unlock();
}
bool BU::building ( toolbox::task::WorkLoop *  wl)

Definition at line 441 of file BU.cc.

References builtIds_, events_, freeIds_, generateEvent(), isBuilding_, isHalting_, lock(), log_, nbEventsBuilt_, postSend(), unlock(), and waitBuild().

Referenced by startBuildingWorkLoop().

{
  waitBuild();
  lock();
  unsigned int buResourceId=freeIds_.front(); freeIds_.pop();
  unlock();
  
  if (buResourceId>=(uint32_t)events_.size()) {
    LOG4CPLUS_INFO(log_,"shutdown 'building' workloop.");
    isBuilding_=false;
    return false;
  }
  
  if (!isHalting_) {
    BUEvent* evt=events_[buResourceId];
    if(generateEvent(evt)) {
      lock();
      nbEventsBuilt_++;
      builtIds_.push(buResourceId);
      unlock();
      
      postSend();
    }
    else {
      LOG4CPLUS_INFO(log_,"building:received null post");
      lock();
      unsigned int saveBUResourceId = buResourceId;
      //buResourceId = freeIds_.front(); freeIds_.pop();
      freeIds_.push(saveBUResourceId);
      unlock();
      isBuilding_=false;
      return false;
    }
  }
  return true;
}
bool BU::configuring ( toolbox::task::WorkLoop *  wl)

Definition at line 178 of file BU.cc.

References alignCSCRings::e, Exception, evf::StateMachine::fireEvent(), evf::StateMachine::fireFailed(), fsm_, isHalting_, log_, lumiQueryAPI::msg, and reset().

{
  isHalting_=false;
  try {
    LOG4CPLUS_INFO(log_,"Start configuring ...");
    reset();
    LOG4CPLUS_INFO(log_,"Finished configuring!");
    fsm_.fireEvent("ConfigureDone",this);
  }
  catch (xcept::Exception &e) {
    string msg = "configuring FAILED: " + (string)e.what();
    fsm_.fireFailed(msg,this);
  }

  return false;
}
toolbox::mem::Reference * BU::createMsgChain ( evf::BUEvent evt,
unsigned int  fuResourceId 
) [private]

remainder>0 means that a partial fed is left over from the last block

no remaining fed data

Definition at line 805 of file BU.cc.

References Association::block, buAppDesc_, evf::BUEvent::buResourceId(), alignCSCRings::e, evf::evtn::evm_board_sense(), evf::evtn::EVM_TCS_LSBLNR_OFFSET, evf::BUEvent::evtNumber(), Exception, fakeLs_, fakeLsUpdateSecs_, evf::BUEvent::fedAddr(), fedHeaderSize_, evf::BUEvent::fedId(), evf::BUEvent::fedSize(), fedTrailerSize_, frlHeaderSize_, fuAppDesc_, evf::evtn::GTPE_ORBTNR_OFFSET, i, i2oPool_, gen::k, prof2calltree::last, lastLsUpdate_, log_, FEDNumbering::MINTriggerEGTPFEDID, FEDNumbering::MINTriggerGTPFEDID, msgBufferSize_, evf::BUEvent::nFed(), overwriteLsId_, evf::evtn::SLINK_HALFWORD_SIZE, validFedIds_, and warning.

Referenced by sending().

{
  unsigned int msgHeaderSize =sizeof(I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME);
  unsigned int msgPayloadSize=msgBufferSize_-msgHeaderSize;

  if((msgPayloadSize%4)!=0) LOG4CPLUS_ERROR(log_,"Invalid Payload Size.");
 
  /*Overwrite lumisection value stored in the event*/
  if (overwriteLsId_.value_) {
    //getting new time and increase LS if past 23 sec
    struct timezone tz;
    if (!fakeLs_) {
      fakeLs_++;
      gettimeofday(&lastLsUpdate_,&tz);
    }
    else {
      timeval newLsUpdate;
      gettimeofday(&newLsUpdate,&tz);
      if ((unsigned long)1000000*newLsUpdate.tv_sec+newLsUpdate.tv_usec 
          - (unsigned  long)1000000*lastLsUpdate_.tv_sec+lastLsUpdate_.tv_usec
          >= fakeLsUpdateSecs_.value_*1000000) 
      {
        fakeLs_++;
        lastLsUpdate_=newLsUpdate;
      }
    }

    int gtpFedPos_=-1;
    int egtpFedPos_=-1;
    for (size_t k=0;k<validFedIds_.size();k++) {
      if (evt->fedId(k)==FEDNumbering::MINTriggerGTPFEDID) {
      //insert ls value into gtp fed
        unsigned char * fgtpAddr = evt->fedAddr(k);
        unsigned int fgtpSize = evt->fedSize(k);
        if (fgtpAddr && fgtpSize) {
          gtpFedPos_=(int)k;
          evtn::evm_board_sense(fgtpAddr,fgtpSize);
          *((unsigned short*)fgtpAddr
              +sizeof(fedh_t)/sizeof(unsigned short)
              + (evtn::EVM_GTFE_BLOCK*2 + evtn::EVM_TCS_LSBLNR_OFFSET)*evtn::SLINK_HALFWORD_SIZE /sizeof(unsigned short)
           ) = (unsigned short)fakeLs_-1;
        }
      }
      if (evt->fedId(k)==FEDNumbering::MINTriggerEGTPFEDID) {
        //insert orbit value into gtpe fed
        unsigned char * fegtpAddr = evt->fedAddr(egtpFedPos_);
        unsigned int fegtpSize = evt->fedSize(egtpFedPos_);
        if (fegtpAddr && fegtpSize) {
          egtpFedPos_=(int)k;
          *(  (unsigned int*)fegtpAddr + evtn::GTPE_ORBTNR_OFFSET * evtn::SLINK_HALFWORD_SIZE/sizeof(unsigned int)
           ) = (unsigned int)(fakeLs_-1)*0x00100000;
        }
      }
    }
    if (gtpFedPos_<0) LOG4CPLUS_ERROR(log_,"Unable to find GTP FED in event!");
    if (egtpFedPos_<0 && gtpFedPos_<0) LOG4CPLUS_ERROR(log_,"Unable to find GTP or GTPE FED in event!");
  }

  toolbox::mem::Reference *head  =0;
  toolbox::mem::Reference *tail  =0;
  toolbox::mem::Reference *bufRef=0;
  
  I2O_MESSAGE_FRAME                  *stdMsg=0;
  I2O_PRIVATE_MESSAGE_FRAME          *pvtMsg=0;
  I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME *block =0;
  
  unsigned int iFed            =0;
  unsigned int nSuperFrag      =64;
  unsigned int nFedPerSuperFrag=validFedIds_.size()/nSuperFrag;
  unsigned int nBigSuperFrags  =validFedIds_.size()%nSuperFrag;
  
  if (evt->nFed()<nSuperFrag) {
    nSuperFrag=evt->nFed();
    nFedPerSuperFrag=1;
    nBigSuperFrags=0;
  }
  else
    {
      nFedPerSuperFrag=evt->nFed()/nSuperFrag;
      nBigSuperFrags  =evt->nFed()%nSuperFrag;
    }
  // loop over all super fragments
  for (unsigned int iSuperFrag=0;iSuperFrag<nSuperFrag;iSuperFrag++) {
    
    // compute index of last fed in this super fragment
    unsigned int nFed=iFed+nFedPerSuperFrag;
    if (iSuperFrag<nBigSuperFrags) ++nFed;
    
    // compute number of blocks in this super fragment
    unsigned int nBlock  =0;
    unsigned int curbSize=frlHeaderSize_;
    unsigned int totSize =curbSize;
    for (unsigned int i=iFed;i<nFed;i++) {
      curbSize+=evt->fedSize(i);
      totSize+=evt->fedSize(i);
      if (curbSize>msgPayloadSize) {
        curbSize+=frlHeaderSize_*(curbSize/msgPayloadSize);
        if(curbSize%msgPayloadSize)totSize+=frlHeaderSize_*(curbSize/msgPayloadSize);
        else totSize+=frlHeaderSize_*((curbSize/msgPayloadSize)-1);
        curbSize=curbSize%msgPayloadSize;
      }
    }
    nBlock=totSize/msgPayloadSize+(totSize%msgPayloadSize>0 ? 1 : 0);
    
    
    // loop over all blocks (msgs) in the current super fragment
    unsigned int   remainder     =0;
    bool           fedTrailerLeft=false;
    bool           last          =false;
    bool           warning       =false;
    unsigned char *startOfPayload=0;
    U32            payload(0);
    
    for(unsigned int iBlock=0;iBlock<nBlock;iBlock++) {
      
      // If last block and its partial (there can be only 0 or 1 partial)
      payload=msgPayloadSize;
      
      // Allocate memory for a fragment block / message
      try {
        bufRef=toolbox::mem::getMemoryPoolFactory()->getFrame(i2oPool_,
                                                              msgBufferSize_);
      }
      catch(xcept::Exception &e) {
        LOG4CPLUS_FATAL(log_,"xdaq::frameAlloc failed");
      }
      
      // Fill in the fields of the fragment block / message
      stdMsg=(I2O_MESSAGE_FRAME*)bufRef->getDataLocation();
      pvtMsg=(I2O_PRIVATE_MESSAGE_FRAME*)stdMsg;
      block =(I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME*)stdMsg;
      
      pvtMsg->XFunctionCode   =I2O_FU_TAKE;
      pvtMsg->OrganizationID  =XDAQ_ORGANIZATION_ID;
      
      stdMsg->MessageSize     =(msgHeaderSize + payload) >> 2;
      stdMsg->Function        =I2O_PRIVATE_MESSAGE;
      stdMsg->VersionOffset   =0;
      stdMsg->MsgFlags        =0;
      stdMsg->InitiatorAddress=i2o::utils::getAddressMap()->getTid(buAppDesc_);
      stdMsg->TargetAddress   =i2o::utils::getAddressMap()->getTid(fuAppDesc_);
      
      block->buResourceId           =evt->buResourceId();
      block->fuTransactionId        =fuResourceId;
      block->blockNb                =iBlock;
      block->nbBlocksInSuperFragment=nBlock;
      block->superFragmentNb        =iSuperFrag;
      block->nbSuperFragmentsInEvent=nSuperFrag;
      block->eventNumber            =evt->evtNumber();
      
      // Fill in payload 
      startOfPayload   =(unsigned char*)block+msgHeaderSize;
      frlh_t* frlHeader=(frlh_t*)startOfPayload;
      frlHeader->trigno=evt->evtNumber();
      frlHeader->segno =iBlock;
      
      unsigned char *startOfFedBlocks=startOfPayload+frlHeaderSize_;
      payload              -=frlHeaderSize_;
      frlHeader->segsize    =payload;
      unsigned int leftspace=payload;
      
      // a fed trailer was left over from the previous block
      if(fedTrailerLeft) {
        memcpy(startOfFedBlocks,
               evt->fedAddr(iFed)+evt->fedSize(iFed)-fedTrailerSize_,
               fedTrailerSize_);
        
        startOfFedBlocks+=fedTrailerSize_;
        leftspace       -=fedTrailerSize_;
        remainder        =0;
        fedTrailerLeft   =false;
        
        // if this is the last fed, adjust block (msg) size and set last=true
        if((iFed==nFed-1) && !last) {
          frlHeader->segsize-=leftspace;
          int msgSize=stdMsg->MessageSize << 2;
          msgSize   -=leftspace;
          bufRef->setDataSize(msgSize);
          stdMsg->MessageSize = msgSize >> 2;
          frlHeader->segsize=frlHeader->segsize | FRL_LAST_SEGM;
          last=true;
        }
        
        // !! increment iFed !!
        iFed++;
      }
      
      if (remainder>0) {
        
        // the remaining fed fits entirely into the new block
        if(payload>=remainder) {
          memcpy(startOfFedBlocks,
                 evt->fedAddr(iFed)+evt->fedSize(iFed)-remainder,
                 remainder);
          
          startOfFedBlocks+=remainder;
          leftspace       -=remainder;
          
          // if this is the last fed in the superfragment, earmark it
          if(iFed==nFed-1) {
            frlHeader->segsize-=leftspace;
            int msgSize=stdMsg->MessageSize << 2;
            msgSize   -=leftspace;
            bufRef->setDataSize(msgSize);
            stdMsg->MessageSize = msgSize >> 2;
            frlHeader->segsize=frlHeader->segsize | FRL_LAST_SEGM;
            last=true;
          }
          
          // !! increment iFed !!
          iFed++;
          
          // start new fed -> set remainder to 0!
          remainder=0;
        }
        // the remaining payload fits, but not the fed trailer
        else if (payload>=(remainder-fedTrailerSize_)) {
          memcpy(startOfFedBlocks,
                 evt->fedAddr(iFed)+evt->fedSize(iFed)-remainder,
                 remainder-fedTrailerSize_);
          
          frlHeader->segsize=remainder-fedTrailerSize_;
          fedTrailerLeft    =true;
          leftspace        -=(remainder-fedTrailerSize_);
          remainder         =fedTrailerSize_;
        }
        // the remaining payload fits only partially, fill whole block
        else {
          memcpy(startOfFedBlocks,
                 evt->fedAddr(iFed)+evt->fedSize(iFed)-remainder,payload);
          remainder-=payload;
          leftspace =0;
        }
      }
      
      if(remainder==0) {
        
        // loop on feds
        while(iFed<nFed) {
          
          // if the next header does not fit, jump to following block
          if((int)leftspace<fedHeaderSize_) {
            frlHeader->segsize-=leftspace;
            break;
          }
          
          memcpy(startOfFedBlocks,evt->fedAddr(iFed),fedHeaderSize_);
          
          leftspace       -=fedHeaderSize_;
          startOfFedBlocks+=fedHeaderSize_;
          
          // fed fits with its trailer
          if(evt->fedSize(iFed)-fedHeaderSize_<=leftspace) {
            memcpy(startOfFedBlocks,
                   evt->fedAddr(iFed)+fedHeaderSize_,
                   evt->fedSize(iFed)-fedHeaderSize_);
            
            leftspace       -=(evt->fedSize(iFed)-fedHeaderSize_);
            startOfFedBlocks+=(evt->fedSize(iFed)-fedHeaderSize_);
          }
          // fed payload fits only without fed trailer
          else if(evt->fedSize(iFed)-fedHeaderSize_-fedTrailerSize_<=leftspace) {
            memcpy(startOfFedBlocks,
                   evt->fedAddr(iFed)+fedHeaderSize_,
                   evt->fedSize(iFed)-fedHeaderSize_-fedTrailerSize_);
            
            leftspace         -=(evt->fedSize(iFed)-fedHeaderSize_-fedTrailerSize_);
            frlHeader->segsize-=leftspace;
            fedTrailerLeft     =true;
            remainder          =fedTrailerSize_;
            
            break;
          }
          // fed payload fits only partially
          else {
            memcpy(startOfFedBlocks,evt->fedAddr(iFed)+fedHeaderSize_,leftspace);
            remainder=evt->fedSize(iFed)-fedHeaderSize_-leftspace;
            leftspace=0;
            
            break;
          }
          
          // !! increase iFed !!
          iFed++;
          
        } // while (iFed<fedN_)
        
        // earmark the last block
        if (iFed==nFed && remainder==0 && !last) {
          frlHeader->segsize-=leftspace;
          int msgSize=stdMsg->MessageSize << 2;
          msgSize   -=leftspace;
          bufRef->setDataSize(msgSize);
          stdMsg->MessageSize=msgSize >> 2;
          frlHeader->segsize =frlHeader->segsize | FRL_LAST_SEGM;
          last=true;
        }
        
      } // if (remainder==0)
      
      if(iSuperFrag==0&&iBlock==0) { // This is the first fragment block / message
        head=bufRef;
        tail=bufRef;
      }
      else {
        tail->setNextReference(bufRef);
        tail=bufRef;
      }
      
      if((iBlock==nBlock-1) && remainder!=0) {
        nBlock++;
        warning=true;
      }
      
    } // for (iBlock)
    
    // fix case where block estimate was wrong
    if(warning) {
      toolbox::mem::Reference* next=head;
      do {
        block =(I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME*)next->getDataLocation();
        if (block->superFragmentNb==iSuperFrag)
          block->nbBlocksInSuperFragment=nBlock;                
      } while((next=next->getNextReference()));
    }
  
  } // iSuperFrag < nSuperFrag
  
  return head; // return the top of the chain
}
void BU::customWebPage ( xgi::Input *  in,
xgi::Output *  out 
) throw (xgi::exception::Exception)

Definition at line 412 of file BU.cc.

References dbtoconf::out.

Referenced by BU().

{
  *out<<"<html></html>"<<endl;
}
double BU::deltaT ( const struct timeval *  start,
const struct timeval *  end 
) [private]

Definition at line 727 of file BU.cc.

Referenced by monitoring().

{
  unsigned int  sec;
  unsigned int  usec;
  
  sec = end->tv_sec - start->tv_sec;
  
  if(end->tv_usec > start->tv_usec) {
    usec = end->tv_usec - start->tv_usec;
  }
  else {
    sec--;
    usec = 1000000 - ((unsigned int )(start->tv_usec - end->tv_usec));
  }
  
  return ((double)sec) + ((double)usec) / 1000000.0;
}
void BU::dumpFrame ( unsigned char *  data,
unsigned int  len 
) [private]

Definition at line 1144 of file BU.cc.

References trackerHits::c, i, and pos.

{
  char left1[20];
  char left2[20];
  char right1[20];
  char right2[20];

  printf("Byte  0  1  2  3  4  5  6  7\n");
  
  int c(0);
  int pos(0);
  
  for (unsigned int i=0;i<(len/8);i++) {
    int rpos(0);
    int off(3);
    for (pos=0;pos<12;pos+=3) {
      sprintf(&left1[pos],"%2.2x ",
              ((unsigned char*)data)[c+off]);
      sprintf(&right1[rpos],"%1c",
              ((data[c+off] > 32)&&(data[c+off] < 127)) ? data[c+off] : '.');
      sprintf (&left2[pos],"%2.2x ",
               ((unsigned char*)data)[c+off+4]);
      sprintf (&right2[rpos],"%1c",
               ((data[c+off+4] > 32)&&(data[c+off+4]<127)) ? data[c+off+4] : '.');
      rpos++;
      off--;
    }
    c+=8;
    
    printf ("%4d: %s%s ||  %s%s  %p\n",
            c-8, left1, left2, right1, right2, &data[c-8]);
  }
  
  fflush(stdout);       
}
bool BU::enabling ( toolbox::task::WorkLoop *  wl)

Definition at line 197 of file BU.cc.

References alignCSCRings::e, Exception, evf::StateMachine::fireEvent(), evf::StateMachine::fireFailed(), fsm_, i, FEDNumbering::inRange(), FEDNumbering::inRangeNoGT(), PlaybackRawDataProvider::instance(), isBuilding_, isHalting_, isSending_, log_, FEDNumbering::MAXFEDID, lumiQueryAPI::msg, startBuildingWorkLoop(), startSendingWorkLoop(), and validFedIds_.

{
  isHalting_=false;
  try {
    LOG4CPLUS_INFO(log_,"Start enabling ...");
    // determine valid fed ids (assumes Playback EP is already configured hence PBRDP::instance 
    // not null in case we are playing back)
    if (0!=PlaybackRawDataProvider::instance()) {
      for (unsigned int i=0;i<(unsigned int)FEDNumbering::MAXFEDID+1;i++)
        if (FEDNumbering::inRange(i)) validFedIds_.push_back(i);
    }
    else{
      for (unsigned int i=0;i<(unsigned int)FEDNumbering::MAXFEDID+1;i++)
        if (FEDNumbering::inRangeNoGT(i)) validFedIds_.push_back(i);
    }
    if (!isBuilding_) startBuildingWorkLoop();
    if (!isSending_)  startSendingWorkLoop();
    LOG4CPLUS_INFO(log_,"Finished enabling!");
    fsm_.fireEvent("EnableDone",this);
  }
  catch (xcept::Exception &e) {
    string msg = "enabling FAILED: " + (string)e.what();
    fsm_.fireFailed(msg,this);
  }
  
  return false;
}
void BU::exportParameters ( ) [private]

Definition at line 629 of file BU.cc.

References evf::WebGUI::addItemChangedListener(), evf::WebGUI::addMonitorCounter(), evf::WebGUI::addMonitorParam(), evf::WebGUI::addStandardParam(), average_, class_, crc_, deltaN_, deltaSumOfSizes_, deltaSumOfSquares_, deltaT_, eventBufferSize_, evf::WebGUI::exportParameters(), fakeLsUpdateSecs_, fedSizeMax_, fedSizeMean_, fedSizeWidth_, firstEvent_, evf::StateMachine::foundRcmsStateListener(), fsm_, gui_, hostname_, instance_, log_, memUsedInMB_, mode_, monSleepSec_, msgBufferSize_, nbEventsBuilt_, nbEventsDiscarded_, nbEventsInBU_, nbEventsRequested_, nbEventsSent_, overwriteEvtId_, overwriteLsId_, queueSize_, rate_, evf::StateMachine::rcmsStateListener(), replay_, rms_, runNumber_, evf::StateMachine::stateName(), throughput_, url_, and useFixedFedSize_.

Referenced by BU().

{
  if (0==gui_) {
    LOG4CPLUS_ERROR(log_,"No GUI, can't export parameters");
    return;
  }
  
  gui_->addMonitorParam("url",                &url_);
  gui_->addMonitorParam("class",              &class_);
  gui_->addMonitorParam("instance",           &instance_);
  gui_->addMonitorParam("hostname",           &hostname_);
  gui_->addMonitorParam("runNumber",          &runNumber_);
  gui_->addMonitorParam("stateName",          fsm_.stateName());
  gui_->addMonitorParam("memUsedInMB",        &memUsedInMB_);
  gui_->addMonitorParam("deltaT",             &deltaT_);
  gui_->addMonitorParam("deltaN",             &deltaN_);
  gui_->addMonitorParam("deltaSumOfSquares",  &deltaSumOfSquares_);
  gui_->addMonitorParam("deltaSumOfSizes",    &deltaSumOfSizes_);
  gui_->addMonitorParam("throughput",         &throughput_);
  gui_->addMonitorParam("average",            &average_);
  gui_->addMonitorParam("rate",               &rate_);
  gui_->addMonitorParam("rms",                &rms_);

  gui_->addMonitorCounter("nbEvtsInBU",       &nbEventsInBU_);
  gui_->addMonitorCounter("nbEvtsRequested",  &nbEventsRequested_);
  gui_->addMonitorCounter("nbEvtsBuilt",      &nbEventsBuilt_);
  gui_->addMonitorCounter("nbEvtsSent",       &nbEventsSent_);
  gui_->addMonitorCounter("nbEvtsDiscarded",  &nbEventsDiscarded_);

  gui_->addStandardParam("mode",              &mode_);
  gui_->addStandardParam("replay",            &replay_);
  gui_->addStandardParam("overwriteEvtId",    &overwriteEvtId_);
  gui_->addStandardParam("overwriteLsId",     &overwriteLsId_);
  gui_->addStandardParam("fakeLsUpdateSecs",   &fakeLsUpdateSecs_);
  gui_->addStandardParam("crc",               &crc_);
  gui_->addStandardParam("firstEvent",        &firstEvent_);
  gui_->addStandardParam("queueSize",         &queueSize_);
  gui_->addStandardParam("eventBufferSize",   &eventBufferSize_);
  gui_->addStandardParam("msgBufferSize",     &msgBufferSize_);
  gui_->addStandardParam("fedSizeMax",        &fedSizeMax_);
  gui_->addStandardParam("fedSizeMean",       &fedSizeMean_);
  gui_->addStandardParam("fedSizeWidth",      &fedSizeWidth_);
  gui_->addStandardParam("useFixedFedSize",   &useFixedFedSize_);
  gui_->addStandardParam("monSleepSec",       &monSleepSec_);
  gui_->addStandardParam("rcmsStateListener",     fsm_.rcmsStateListener());
  gui_->addStandardParam("foundRcmsStateListener",fsm_.foundRcmsStateListener());

  
  gui_->exportParameters();

  gui_->addItemChangedListener("crc",this);
  
}
xoap::MessageReference BU::fsmCallback ( xoap::MessageReference  msg) throw (xoap::exception::Exception)

Definition at line 311 of file BU.cc.

References lumiQueryAPI::msg.

{
  return fsm_.commandCallback(msg);
}
bool BU::generateEvent ( evf::BUEvent evt) [private]

Definition at line 747 of file BU.cc.

References event(), fedh_struct::eventid, events_, evtNumber_, funct::exp(), fedHeaderSize_, fedSizeMax_, fedSizeMean_, fedTrailerSize_, firstEvent_, gaussianMean_, gaussianWidth_, i, evf::BUEvent::initialize(), instance, PlaybackRawDataProvider::instance(), nbEventsBuilt_, overwriteEvtId_, replay_, convertSQLiteXML::runNumber, PlaybackRawDataProvider::setFreeToEof(), useFixedFedSize_, validFedIds_, evf::BUEvent::writeFed(), evf::BUEvent::writeFedHeader(), and evf::BUEvent::writeFedTrailer().

Referenced by building().

{
  // replay?
  if (replay_.value_&&nbEventsBuilt_>=(uint32_t)events_.size()) 
    {
      if (0!=PlaybackRawDataProvider::instance())
        PlaybackRawDataProvider::instance()->setFreeToEof();
      return true;
    }  
  // PLAYBACK mode
  if (0!=PlaybackRawDataProvider::instance()) {
    
    unsigned int runNumber,evtNumber;

    FEDRawDataCollection* event=
      PlaybackRawDataProvider::instance()->getFEDRawData(runNumber,evtNumber);
    if(event == 0) return false;
    evt->initialize(evtNumber);
    
    for (unsigned int i=0;i<validFedIds_.size();i++) {
      unsigned int   fedId  =validFedIds_[i];
      unsigned int   fedSize=event->FEDData(fedId).size();
      unsigned char* fedAddr=event->FEDData(fedId).data();
      if (overwriteEvtId_.value_ && fedAddr != 0) {
        fedh_t *fedHeader=(fedh_t*)fedAddr;
        fedHeader->eventid=(fedHeader->eventid&0xFF000000)+(evtNumber&0x00FFFFFF);
      }
      if (fedSize>0) evt->writeFed(fedId,fedAddr,fedSize);
    }
    delete event;
  }
  // RANDOM mode
  else {
    unsigned int evtNumber=(firstEvent_+evtNumber_++)%0x1000000;
    evt->initialize(evtNumber);
    unsigned int fedSizeMin=fedHeaderSize_+fedTrailerSize_;
    for (unsigned int i=0;i<validFedIds_.size();i++) {
      unsigned int fedId(validFedIds_[i]);
      unsigned int fedSize(fedSizeMean_);
      if (!useFixedFedSize_) {
        double logFedSize=CLHEP::RandGauss::shoot(gaussianMean_,gaussianWidth_);
        fedSize=(unsigned int)(std::exp(logFedSize));
        if (fedSize<fedSizeMin)  fedSize=fedSizeMin;
        if (fedSize>fedSizeMax_) fedSize=fedSizeMax_;
        fedSize-=fedSize%8;
      }
      
      evt->writeFed(fedId,0,fedSize);
      evt->writeFedHeader(i);
      evt->writeFedTrailer(i);
    }
    
  }
  return true;
}
bool BU::halting ( toolbox::task::WorkLoop *  wl)

Definition at line 280 of file BU.cc.

References builtIds_, alignCSCRings::e, events_, Exception, evf::StateMachine::fireEvent(), evf::StateMachine::fireFailed(), freeIds_, fsm_, instance, PlaybackRawDataProvider::instance(), isBuilding_, isHalting_, isSending_, lock(), log_, lumiQueryAPI::msg, nbEventsBuilt_, postBuild(), postSend(), replay_, and unlock().

{
  try {
    LOG4CPLUS_INFO(log_,"Start halting ...");
    isHalting_=true;
    if (isBuilding_&&isSending_) {
      lock();
      freeIds_.push(events_.size());
      builtIds_.push(events_.size());
      unlock();
      postBuild();
      postSend();
    }
    if (0!=PlaybackRawDataProvider::instance()&&
        (!replay_.value_||nbEventsBuilt_<(uint32_t)events_.size())) { 
      PlaybackRawDataProvider::instance()->setFreeToEof();
      while (!PlaybackRawDataProvider::instance()->areFilesClosed()) usleep(1000000);
      usleep(100000);
    }
    LOG4CPLUS_INFO(log_,"Finished halting!");
    fsm_.fireEvent("HaltDone",this);
  }
  catch (xcept::Exception &e) {
    string msg = "halting FAILED: " + (string)e.what();
    fsm_.fireFailed(msg,this);
  }
  return false;
}
void BU::I2O_BU_ALLOCATE_Callback ( toolbox::mem::Reference *  bufRef)

Definition at line 319 of file BU.cc.

References fuAppDesc_, i, isHalting_, lock(), log_, lumiQueryAPI::msg, nbEventsInBU_, nbEventsRequested_, postRqst(), rqstIds_, and unlock().

Referenced by BU().

{
  if (isHalting_) {
    LOG4CPLUS_WARN(log_,"Ignore BU_ALLOCATE message while halting.");
    bufRef->release();
    return;
  }
  
  I2O_MESSAGE_FRAME             *stdMsg;
  I2O_BU_ALLOCATE_MESSAGE_FRAME *msg;
  
  stdMsg=(I2O_MESSAGE_FRAME*)bufRef->getDataLocation();
  msg   =(I2O_BU_ALLOCATE_MESSAGE_FRAME*)stdMsg;
  
  if (0==fuAppDesc_) {
    I2O_TID fuTid=stdMsg->InitiatorAddress;
    fuAppDesc_=i2o::utils::getAddressMap()->getApplicationDescriptor(fuTid);
  }
  
  for (unsigned int i=0;i<msg->n;i++) {
    unsigned int fuResourceId=msg->allocate[i].fuTransactionId;
    lock();
    rqstIds_.push(fuResourceId);
    postRqst();
    nbEventsRequested_++;
    nbEventsInBU_++;
    unlock();
  }

  bufRef->release();
}
void BU::I2O_BU_DISCARD_Callback ( toolbox::mem::Reference *  bufRef)

Definition at line 353 of file BU.cc.

References freeIds_, isHalting_, lock(), log_, lumiQueryAPI::msg, nbEventsDiscarded_, postBuild(), query::result, sentIds_, and unlock().

Referenced by BU().

{
  if (isHalting_) {
    LOG4CPLUS_WARN(log_,"Ignore BU_DISCARD message while halting.");
    bufRef->release();
    return;
  }

  I2O_MESSAGE_FRAME           *stdMsg=(I2O_MESSAGE_FRAME*)bufRef->getDataLocation();
  I2O_BU_DISCARD_MESSAGE_FRAME*msg   =(I2O_BU_DISCARD_MESSAGE_FRAME*)stdMsg;
  unsigned int buResourceId=msg->buResourceId[0];

  lock();
  int result=sentIds_.erase(buResourceId);
  unlock();
  
  if (!result) {
    LOG4CPLUS_ERROR(log_,"can't discard unknown buResourceId '"<<buResourceId<<"'");
  }
  else {
    lock();
    freeIds_.push(buResourceId);
    nbEventsDiscarded_.value_++;
    unlock();
    postBuild();
  }
  
  bufRef->release();
}
void evf::BU::lock ( void  ) [inline, private]

Definition at line 110 of file BU.h.

References lock_.

Referenced by building(), halting(), I2O_BU_ALLOCATE_Callback(), I2O_BU_DISCARD_Callback(), monitoring(), sending(), and stopping().

{ sem_wait(&lock_); }
bool BU::monitoring ( toolbox::task::WorkLoop *  wl)

Definition at line 562 of file BU.cc.

References average_, deltaN_, deltaSumOfSizes_, deltaSumOfSquares_, deltaT(), deltaT_, gui_, lock(), timingPdfMaker::mean, evf::WebGUI::monInfoSpace(), monLastN_, monLastSumOfSizes_, monLastSumOfSquares_, monSleepSec_, monStartTime_, nbEventsBuilt_, rate_, rms_, stor::utils::sleep(), mathSSE::sqrt(), sumOfSizes_, sumOfSquares_, throughput_, and unlock().

Referenced by startMonitoringWorkLoop().

{
  struct timeval  monEndTime;
  struct timezone timezone;
  
  gettimeofday(&monEndTime,&timezone);
  
  lock();
  unsigned int monN           =nbEventsBuilt_.value_;
  uint64_t     monSumOfSquares=sumOfSquares_;
  unsigned int monSumOfSizes  =sumOfSizes_;
  uint64_t     deltaSumOfSquares;
  unlock();
  
  gui_->monInfoSpace()->lock();
  
  deltaT_.value_=deltaT(&monStartTime_,&monEndTime);
  monStartTime_=monEndTime;
  
  deltaN_.value_=monN-monLastN_;
  monLastN_=monN;

  deltaSumOfSquares=monSumOfSquares-monLastSumOfSquares_;
  deltaSumOfSquares_.value_=(double)deltaSumOfSquares;
  monLastSumOfSquares_=monSumOfSquares;
  
  deltaSumOfSizes_.value_=monSumOfSizes-monLastSumOfSizes_;
  monLastSumOfSizes_=monSumOfSizes;
  
  if (deltaT_.value_!=0) {
    throughput_=deltaSumOfSizes_.value_/deltaT_.value_;
    rate_      =deltaN_.value_/deltaT_.value_;
  }
  else {
    throughput_=0.0;
    rate_      =0.0;
  }
  
  double meanOfSquares,mean,squareOfMean,variance;
  
  if(deltaN_.value_!=0) {
    meanOfSquares=deltaSumOfSquares_.value_/((double)(deltaN_.value_));
    mean=((double)(deltaSumOfSizes_.value_))/((double)(deltaN_.value_));
    squareOfMean=mean*mean;
    variance=meanOfSquares-squareOfMean;
    average_=deltaSumOfSizes_.value_/deltaN_.value_;
    rms_    =std::sqrt(variance);
  }
  else {
    average_=0.0;
    rms_    =0.0;
  }

  gui_->monInfoSpace()->unlock();
  
  ::sleep(monSleepSec_.value_);

  return true;
}
void evf::BU::postBuild ( ) [inline, private]

Definition at line 113 of file BU.h.

References buildSem_.

Referenced by halting(), I2O_BU_DISCARD_Callback(), and stopping().

{ sem_post(&buildSem_); }
void evf::BU::postRqst ( ) [inline, private]

Definition at line 117 of file BU.h.

References rqstSem_.

Referenced by I2O_BU_ALLOCATE_Callback().

{ sem_post(&rqstSem_); }
void evf::BU::postSend ( ) [inline, private]

Definition at line 115 of file BU.h.

References sendSem_.

Referenced by building(), halting(), and stopping().

{ sem_post(&sendSem_); }
void BU::reset ( void  ) [private]

Definition at line 685 of file BU.cc.

References average_, buildSem_, builtIds_, deltaN_, deltaSumOfSizes_, deltaSumOfSquares_, deltaT_, eventBufferSize_, events_, fakeLs_, freeIds_, gui_, i, lock_, monLastN_, monLastSumOfSizes_, monLastSumOfSquares_, queueSize_, rate_, evf::WebGUI::resetCounters(), rms_, rqstIds_, rqstSem_, sendSem_, sentIds_, throughput_, and validFedIds_.

Referenced by configuring(), and stopping().

{
  gui_->resetCounters();
  
  deltaT_             =0.0;
  deltaN_             =  0;
  deltaSumOfSquares_  =  0;
  deltaSumOfSizes_    =  0;
  
  throughput_         =0.0;
  average_            =  0;
  rate_               =  0;
  rms_                =  0;

  monLastN_           =  0;
  monLastSumOfSquares_=  0;
  monLastSumOfSizes_  =  0;
  
  while (events_.size()) {
    delete events_.back();
    events_.pop_back();
  }
  
  while (!rqstIds_.empty())  rqstIds_.pop();
  while (!freeIds_.empty())  freeIds_.pop();
  while (!builtIds_.empty()) builtIds_.pop();
  sentIds_.clear();
 
  sem_init(&lock_,0,1);
  sem_init(&buildSem_,0,queueSize_);
  sem_init(&sendSem_,0,0);
  sem_init(&rqstSem_,0,0);
  
  for (unsigned int i=0;i<queueSize_;i++) {
    events_.push_back(new BUEvent(i,eventBufferSize_));
    freeIds_.push(i);
  }
  validFedIds_.clear();
  fakeLs_=0;
}
bool BU::sending ( toolbox::task::WorkLoop *  wl)

Definition at line 501 of file BU.cc.

References buAppContext_, buAppDesc_, builtIds_, createMsgChain(), events_, evf::BUEvent::evtSize(), fuAppDesc_, isHalting_, isSending_, lock(), log_, lumiQueryAPI::msg, nbEventsInBU_, nbEventsSent_, rqstIds_, sentIds_, sumOfSizes_, sumOfSquares_, unlock(), waitRqst(), and waitSend().

Referenced by startSendingWorkLoop().

{
  waitSend();
  lock();
  unsigned int buResourceId=builtIds_.front(); builtIds_.pop();
  unlock();
  
  if (buResourceId>=(uint32_t)events_.size()) {
    LOG4CPLUS_INFO(log_,"shutdown 'sending' workloop.");
    isSending_=false;
    return false;
  }

  if (!isHalting_) {
    waitRqst();
    lock();
    unsigned int fuResourceId=rqstIds_.front(); rqstIds_.pop();
    unlock();
    
    BUEvent* evt=events_[buResourceId];
    toolbox::mem::Reference* msg=createMsgChain(evt,fuResourceId);
    
    lock();
    sumOfSquares_+=(uint64_t)evt->evtSize()*(uint64_t)evt->evtSize();
    sumOfSizes_  +=evt->evtSize();
    nbEventsInBU_--;
    nbEventsSent_++;
    sentIds_.insert(buResourceId);
    unlock();
    
    buAppContext_->postFrame(msg,buAppDesc_,fuAppDesc_);  
  }
  
  return true;
}
void BU::startBuildingWorkLoop ( ) throw (evf::Exception)

Definition at line 420 of file BU.cc.

References asBuilding_, building(), alignCSCRings::e, Exception, isBuilding_, log_, lumiQueryAPI::msg, sourceId_, and wlBuilding_.

Referenced by enabling().

{
  try {
    LOG4CPLUS_INFO(log_,"Start 'building' workloop");
    wlBuilding_=
      toolbox::task::getWorkLoopFactory()->getWorkLoop(sourceId_+
                                                       "Building",
                                                       "waiting");
    if (!wlBuilding_->isActive()) wlBuilding_->activate();
    asBuilding_=toolbox::task::bind(this,&BU::building,sourceId_+"Building");
    wlBuilding_->submit(asBuilding_);
    isBuilding_=true;
  }
  catch (xcept::Exception& e) {
    string msg = "Failed to start workloop 'building'.";
    XCEPT_RETHROW(evf::Exception,msg,e);
  }
}
void BU::startMonitoringWorkLoop ( ) throw (evf::Exception)

Definition at line 539 of file BU.cc.

References asMonitoring_, alignCSCRings::e, Exception, log_, monitoring(), monStartTime_, lumiQueryAPI::msg, sourceId_, and wlMonitoring_.

Referenced by BU().

{
  struct timezone timezone;
  gettimeofday(&monStartTime_,&timezone);
  
  try {
    LOG4CPLUS_INFO(log_,"Start 'monitoring' workloop");
    wlMonitoring_=
      toolbox::task::getWorkLoopFactory()->getWorkLoop(sourceId_+
                                                       "Monitoring",
                                                       "waiting");
    if (!wlMonitoring_->isActive()) wlMonitoring_->activate();
    asMonitoring_=toolbox::task::bind(this,&BU::monitoring,sourceId_+"Monitoring");
    wlMonitoring_->submit(asMonitoring_);
  }
  catch (xcept::Exception& e) {
    string msg = "Failed to start workloop 'monitoring'.";
    XCEPT_RETHROW(evf::Exception,msg,e);
  }
}
void BU::startSendingWorkLoop ( ) throw (evf::Exception)

Definition at line 480 of file BU.cc.

References asSending_, alignCSCRings::e, Exception, isSending_, log_, lumiQueryAPI::msg, sending(), sourceId_, and wlSending_.

Referenced by enabling().

{
  try {
    LOG4CPLUS_INFO(log_,"Start 'sending' workloop");
    wlSending_=toolbox::task::getWorkLoopFactory()->getWorkLoop(sourceId_+
                                                                "Sending",
                                                                "waiting");
    if (!wlSending_->isActive()) wlSending_->activate();

    asSending_=toolbox::task::bind(this,&BU::sending,sourceId_+"Sending");
    wlSending_->submit(asSending_);
    isSending_=true;
  }
  catch (xcept::Exception& e) {
    string msg = "Failed to start workloop 'sending'.";
    XCEPT_RETHROW(evf::Exception,msg,e);
  }
}
bool BU::stopping ( toolbox::task::WorkLoop *  wl)

Definition at line 227 of file BU.cc.

References builtIds_, alignCSCRings::e, events_, Exception, evf::StateMachine::fireEvent(), evf::StateMachine::fireFailed(), freeIds_, fsm_, instance, PlaybackRawDataProvider::instance(), lock(), log_, lumiQueryAPI::msg, postBuild(), postSend(), reset(), sentIds_, stor::utils::sleep(), and unlock().

{
  try {
    LOG4CPLUS_INFO(log_,"Start stopping :) ...");

    if (0!=PlaybackRawDataProvider::instance()) { /*&&
        (!replay_.value_||nbEventsBuilt_<(uint32_t)events_.size())) { */
      lock();
      freeIds_.push(events_.size()); 
      unlock();
      postBuild();
      while (!builtIds_.empty()) {
        LOG4CPLUS_INFO(log_,"wait to flush ... #builtIds="<<builtIds_.size());
	::sleep(1);
      }
      // let the playback go to the last event and exit
      PlaybackRawDataProvider::instance()->setFreeToEof(); 
      while (!PlaybackRawDataProvider::instance()->areFilesClosed()) usleep(1000000);
      usleep(100000);
    }
    
    lock();
    builtIds_.push(events_.size());
    unlock();

    postSend();
    while (!sentIds_.empty()) {
      LOG4CPLUS_INFO(log_,"wait to flush ...");
      ::sleep(1);
    }
    reset();
    //postBuild();
    /* this is not needed and should not run if reset is called
    if (0!=PlaybackRawDataProvider::instance()&&
        (replay_.value_&&nbEventsBuilt_>=(uint32_t)events_.size())) {
      lock();
      freeIds_.push(events_.size());
      unlock();
      postBuild();
    }
    */
    LOG4CPLUS_INFO(log_,"Finished stopping!");
    fsm_.fireEvent("StopDone",this);
  }
  catch (xcept::Exception &e) {
    string msg = "stopping FAILED: " + (string)e.what();
    fsm_.fireFailed(msg,this);
  }
  return false;
}
void evf::BU::unlock ( void  ) [inline, private]

Definition at line 111 of file BU.h.

References lock_.

Referenced by building(), halting(), I2O_BU_ALLOCATE_Callback(), I2O_BU_DISCARD_Callback(), monitoring(), sending(), and stopping().

{ sem_post(&lock_); }
void evf::BU::waitBuild ( ) [inline, private]

Definition at line 112 of file BU.h.

References buildSem_.

Referenced by building().

{ sem_wait(&buildSem_); }
void evf::BU::waitRqst ( ) [inline, private]

Definition at line 116 of file BU.h.

References rqstSem_.

Referenced by sending().

{ sem_wait(&rqstSem_); }
void evf::BU::waitSend ( ) [inline, private]

Definition at line 114 of file BU.h.

References sendSem_.

Referenced by sending().

{ sem_wait(&sendSem_); }
void BU::webPageRequest ( xgi::Input *  in,
xgi::Output *  out 
) throw (xgi::exception::Exception)

Definition at line 402 of file BU.cc.

References recoMuon::in, mergeVDriftHistosByStation::name, and dbtoconf::out.

Referenced by BU().

{
  string name=in->getenv("PATH_INFO");
  if (name.empty()) name="defaultWebPage";
  static_cast<xgi::MethodSignature*>(gui_->getMethod(name))->invoke(in,out);
}
evf::BU::XDAQ_INSTANTIATOR ( )

Member Data Documentation

toolbox::task::ActionSignature* evf::BU::asBuilding_ [private]

Definition at line 169 of file BU.h.

Referenced by startBuildingWorkLoop().

toolbox::task::ActionSignature* evf::BU::asMonitoring_ [private]

Definition at line 177 of file BU.h.

Referenced by startMonitoringWorkLoop().

toolbox::task::ActionSignature* evf::BU::asSending_ [private]

Definition at line 173 of file BU.h.

Referenced by startSendingWorkLoop().

xdata::Double evf::BU::average_ [private]

Definition at line 196 of file BU.h.

Referenced by exportParameters(), monitoring(), and reset().

xdaq::ApplicationContext* evf::BU::buAppContext_ [private]

Definition at line 146 of file BU.h.

Referenced by sending().

xdaq::ApplicationDescriptor* evf::BU::buAppDesc_ [private]

Definition at line 140 of file BU.h.

Referenced by createMsgChain(), and sending().

sem_t evf::BU::buildSem_ [private]

Definition at line 244 of file BU.h.

Referenced by postBuild(), reset(), and waitBuild().

std::queue<unsigned int> evf::BU::builtIds_ [private]

Definition at line 158 of file BU.h.

Referenced by building(), halting(), reset(), sending(), and stopping().

xdata::String evf::BU::class_ [private]

Definition at line 184 of file BU.h.

Referenced by BU(), and exportParameters().

xdata::Boolean evf::BU::crc_ [private]

Definition at line 210 of file BU.h.

Referenced by actionPerformed(), BU(), and exportParameters().

xdata::UnsignedInteger32 evf::BU::deltaN_ [private]

Definition at line 191 of file BU.h.

Referenced by exportParameters(), monitoring(), and reset().

xdata::UnsignedInteger32 evf::BU::deltaSumOfSizes_ [private]

Definition at line 193 of file BU.h.

Referenced by exportParameters(), monitoring(), and reset().

xdata::Double evf::BU::deltaSumOfSquares_ [private]

Definition at line 192 of file BU.h.

Referenced by exportParameters(), monitoring(), and reset().

xdata::Double evf::BU::deltaT_ [private]

Definition at line 190 of file BU.h.

Referenced by exportParameters(), monitoring(), and reset().

xdata::UnsignedInteger32 evf::BU::eventBufferSize_ [private]

Definition at line 216 of file BU.h.

Referenced by exportParameters(), and reset().

std::vector<evf::BUEvent*> evf::BU::events_ [private]

Definition at line 155 of file BU.h.

Referenced by building(), generateEvent(), halting(), reset(), sending(), stopping(), and ~BU().

unsigned int evf::BU::evtNumber_ [private]

Definition at line 160 of file BU.h.

Referenced by generateEvent().

unsigned int evf::BU::fakeLs_ [private]

Definition at line 224 of file BU.h.

Referenced by createMsgChain(), and reset().

xdata::UnsignedInteger32 evf::BU::fakeLsUpdateSecs_ [private]

Definition at line 213 of file BU.h.

Referenced by createMsgChain(), and exportParameters().

const int evf::BU::fedHeaderSize_ = sizeof(fedh_t) [static, private]

Definition at line 253 of file BU.h.

Referenced by createMsgChain(), and generateEvent().

xdata::UnsignedInteger32 evf::BU::fedSizeMax_ [private]

Definition at line 218 of file BU.h.

Referenced by exportParameters(), and generateEvent().

xdata::UnsignedInteger32 evf::BU::fedSizeMean_ [private]

Definition at line 219 of file BU.h.

Referenced by BU(), exportParameters(), and generateEvent().

xdata::UnsignedInteger32 evf::BU::fedSizeWidth_ [private]

Definition at line 220 of file BU.h.

Referenced by BU(), and exportParameters().

const int evf::BU::fedTrailerSize_ = sizeof(fedt_t) [static, private]

Definition at line 254 of file BU.h.

Referenced by createMsgChain(), and generateEvent().

xdata::UnsignedInteger32 evf::BU::firstEvent_ [private]

Definition at line 214 of file BU.h.

Referenced by exportParameters(), and generateEvent().

std::queue<unsigned int> evf::BU::freeIds_ [private]

Definition at line 157 of file BU.h.

Referenced by building(), halting(), I2O_BU_DISCARD_Callback(), reset(), and stopping().

const int evf::BU::frlHeaderSize_ = sizeof(frlh_t) [static, private]

Definition at line 252 of file BU.h.

Referenced by createMsgChain().

Definition at line 149 of file BU.h.

Referenced by BU(), configuring(), enabling(), exportParameters(), halting(), and stopping().

xdaq::ApplicationDescriptor* evf::BU::fuAppDesc_ [private]

Definition at line 143 of file BU.h.

Referenced by createMsgChain(), I2O_BU_ALLOCATE_Callback(), and sending().

double evf::BU::gaussianMean_ [private]

Definition at line 227 of file BU.h.

Referenced by BU(), and generateEvent().

double evf::BU::gaussianWidth_ [private]

Definition at line 228 of file BU.h.

Referenced by BU(), and generateEvent().

WebGUI* evf::BU::gui_ [private]

Definition at line 152 of file BU.h.

Referenced by actionPerformed(), BU(), exportParameters(), monitoring(), and reset().

xdata::String evf::BU::hostname_ [private]

Definition at line 186 of file BU.h.

Referenced by BU(), and exportParameters().

toolbox::mem::Pool* evf::BU::i2oPool_ [private]

Definition at line 240 of file BU.h.

Referenced by actionPerformed(), BU(), and createMsgChain().

xdata::UnsignedInteger32 evf::BU::instance_ [private]

Definition at line 185 of file BU.h.

Referenced by BU(), and exportParameters().

bool evf::BU::isBuilding_ [private]

Definition at line 163 of file BU.h.

Referenced by building(), enabling(), halting(), and startBuildingWorkLoop().

bool evf::BU::isHalting_ [private]
bool evf::BU::isSending_ [private]

Definition at line 164 of file BU.h.

Referenced by enabling(), halting(), sending(), and startSendingWorkLoop().

timeval evf::BU::lastLsUpdate_ [private]

Definition at line 225 of file BU.h.

Referenced by createMsgChain().

sem_t evf::BU::lock_ [private]

Definition at line 243 of file BU.h.

Referenced by lock(), reset(), and unlock().

Logger evf::BU::log_ [private]
xdata::Double evf::BU::memUsedInMB_ [private]

Definition at line 188 of file BU.h.

Referenced by actionPerformed(), and exportParameters().

xdata::String evf::BU::mode_ [private]

Definition at line 208 of file BU.h.

Referenced by actionPerformed(), and exportParameters().

unsigned int evf::BU::monLastN_ [private]

Definition at line 232 of file BU.h.

Referenced by monitoring(), and reset().

unsigned int evf::BU::monLastSumOfSizes_ [private]

Definition at line 234 of file BU.h.

Referenced by monitoring(), and reset().

uint64_t evf::BU::monLastSumOfSquares_ [private]

Definition at line 233 of file BU.h.

Referenced by monitoring(), and reset().

xdata::UnsignedInteger32 evf::BU::monSleepSec_ [private]

Definition at line 222 of file BU.h.

Referenced by exportParameters(), and monitoring().

struct timeval evf::BU::monStartTime_ [private]

Definition at line 231 of file BU.h.

Referenced by monitoring(), and startMonitoringWorkLoop().

xdata::UnsignedInteger32 evf::BU::msgBufferSize_ [private]

Definition at line 217 of file BU.h.

Referenced by createMsgChain(), and exportParameters().

xdata::UnsignedInteger32 evf::BU::nbEventsBuilt_ [private]

Definition at line 203 of file BU.h.

Referenced by building(), exportParameters(), generateEvent(), halting(), and monitoring().

xdata::UnsignedInteger32 evf::BU::nbEventsDiscarded_ [private]

Definition at line 205 of file BU.h.

Referenced by exportParameters(), and I2O_BU_DISCARD_Callback().

xdata::UnsignedInteger32 evf::BU::nbEventsInBU_ [private]

Definition at line 201 of file BU.h.

Referenced by exportParameters(), I2O_BU_ALLOCATE_Callback(), and sending().

xdata::UnsignedInteger32 evf::BU::nbEventsRequested_ [private]

Definition at line 202 of file BU.h.

Referenced by exportParameters(), and I2O_BU_ALLOCATE_Callback().

xdata::UnsignedInteger32 evf::BU::nbEventsSent_ [private]

Definition at line 204 of file BU.h.

Referenced by exportParameters(), and sending().

xdata::Boolean evf::BU::overwriteEvtId_ [private]

Definition at line 211 of file BU.h.

Referenced by exportParameters(), and generateEvent().

xdata::Boolean evf::BU::overwriteLsId_ [private]

Definition at line 212 of file BU.h.

Referenced by createMsgChain(), and exportParameters().

xdata::UnsignedInteger32 evf::BU::queueSize_ [private]

Definition at line 215 of file BU.h.

Referenced by exportParameters(), and reset().

xdata::Double evf::BU::rate_ [private]

Definition at line 197 of file BU.h.

Referenced by exportParameters(), monitoring(), and reset().

xdata::Boolean evf::BU::replay_ [private]

Definition at line 209 of file BU.h.

Referenced by exportParameters(), generateEvent(), and halting().

xdata::Double evf::BU::rms_ [private]

Definition at line 198 of file BU.h.

Referenced by exportParameters(), monitoring(), and reset().

std::queue<unsigned int> evf::BU::rqstIds_ [private]

Definition at line 156 of file BU.h.

Referenced by I2O_BU_ALLOCATE_Callback(), reset(), and sending().

sem_t evf::BU::rqstSem_ [private]

Definition at line 246 of file BU.h.

Referenced by postRqst(), reset(), and waitRqst().

xdata::UnsignedInteger32 evf::BU::runNumber_ [private]

Definition at line 187 of file BU.h.

Referenced by exportParameters().

sem_t evf::BU::sendSem_ [private]

Definition at line 245 of file BU.h.

Referenced by postSend(), reset(), and waitSend().

std::set<unsigned int> evf::BU::sentIds_ [private]

Definition at line 159 of file BU.h.

Referenced by I2O_BU_DISCARD_Callback(), reset(), sending(), and stopping().

std::string evf::BU::sourceId_ [private]

Definition at line 180 of file BU.h.

Referenced by BU(), startBuildingWorkLoop(), startMonitoringWorkLoop(), and startSendingWorkLoop().

unsigned int evf::BU::sumOfSizes_ [private]

Definition at line 236 of file BU.h.

Referenced by monitoring(), and sending().

uint64_t evf::BU::sumOfSquares_ [private]

Definition at line 235 of file BU.h.

Referenced by monitoring(), and sending().

xdata::Double evf::BU::throughput_ [private]

Definition at line 195 of file BU.h.

Referenced by exportParameters(), monitoring(), and reset().

xdata::String evf::BU::url_ [private]

Definition at line 183 of file BU.h.

Referenced by BU(), and exportParameters().

xdata::Boolean evf::BU::useFixedFedSize_ [private]

Definition at line 221 of file BU.h.

Referenced by exportParameters(), and generateEvent().

std::vector<unsigned int> evf::BU::validFedIds_ [private]

Definition at line 161 of file BU.h.

Referenced by createMsgChain(), enabling(), generateEvent(), and reset().

toolbox::task::WorkLoop* evf::BU::wlBuilding_ [private]

Definition at line 168 of file BU.h.

Referenced by startBuildingWorkLoop().

toolbox::task::WorkLoop* evf::BU::wlMonitoring_ [private]

Definition at line 176 of file BU.h.

Referenced by startMonitoringWorkLoop().

toolbox::task::WorkLoop* evf::BU::wlSending_ [private]

Definition at line 172 of file BU.h.

Referenced by startSendingWorkLoop().