CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
List of all members | Public Member Functions | Private Member Functions | Private Attributes | Static Private Attributes
evf::BU Class Reference

#include <BU.h>

Inheritance diagram for evf::BU:

Public Member Functions

void actionPerformed (xdata::Event &e)
 
 BU (xdaq::ApplicationStub *s)
 
bool building (toolbox::task::WorkLoop *wl)
 
bool configuring (toolbox::task::WorkLoop *wl)
 
void customWebPage (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception)
 
bool enabling (toolbox::task::WorkLoop *wl)
 
xoap::MessageReference fsmCallback (xoap::MessageReference msg) throw (xoap::exception::Exception)
 
bool halting (toolbox::task::WorkLoop *wl)
 
void I2O_BU_ALLOCATE_Callback (toolbox::mem::Reference *bufRef)
 
void I2O_BU_DISCARD_Callback (toolbox::mem::Reference *bufRef)
 
bool monitoring (toolbox::task::WorkLoop *wl)
 
bool sending (toolbox::task::WorkLoop *wl)
 
void startBuildingWorkLoop () throw (evf::Exception)
 
void startMonitoringWorkLoop () throw (evf::Exception)
 
void startSendingWorkLoop () throw (evf::Exception)
 
bool stopping (toolbox::task::WorkLoop *wl)
 
void webPageRequest (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception)
 
 XDAQ_INSTANTIATOR ()
 
virtual ~BU ()
 

Private Member Functions

toolbox::mem::Reference * createMsgChain (evf::BUEvent *evt, unsigned int fuResourceId)
 
double deltaT (const struct timeval *start, const struct timeval *end)
 
void dumpFrame (unsigned char *data, unsigned int len)
 
void exportParameters ()
 
bool generateEvent (evf::BUEvent *evt)
 
void lock ()
 
void postBuild ()
 
void postRqst ()
 
void postSend ()
 
void reset ()
 
void unlock ()
 
void waitBuild ()
 
void waitRqst ()
 
void waitSend ()
 

Private Attributes

toolbox::task::ActionSignature * asBuilding_
 
toolbox::task::ActionSignature * asMonitoring_
 
toolbox::task::ActionSignature * asSending_
 
xdata::Double average_
 
xdaq::ApplicationContext * buAppContext_
 
xdaq::ApplicationDescriptor * buAppDesc_
 
sem_t buildSem_
 
std::queue< unsigned int > builtIds_
 
xdata::String class_
 
xdata::Boolean crc_
 
xdata::UnsignedInteger32 deltaN_
 
xdata::UnsignedInteger32 deltaSumOfSizes_
 
xdata::Double deltaSumOfSquares_
 
xdata::Double deltaT_
 
xdata::UnsignedInteger32 eventBufferSize_
 
std::vector< evf::BUEvent * > events_
 
unsigned int evtNumber_
 
unsigned int fakeLs_
 
xdata::UnsignedInteger32 fakeLsUpdateSecs_
 
xdata::UnsignedInteger32 fedSizeMax_
 
xdata::UnsignedInteger32 fedSizeMean_
 
xdata::UnsignedInteger32 fedSizeWidth_
 
xdata::UnsignedInteger32 firstEvent_
 
std::queue< unsigned int > freeIds_
 
StateMachine fsm_
 
xdaq::ApplicationDescriptor * fuAppDesc_
 
double gaussianMean_
 
double gaussianWidth_
 
WebGUIgui_
 
xdata::String hostname_
 
toolbox::mem::Pool * i2oPool_
 
xdata::UnsignedInteger32 instance_
 
bool isBuilding_
 
bool isHalting_
 
bool isSending_
 
timeval lastLsUpdate_
 
sem_t lock_
 
Logger log_
 
xdata::Double memUsedInMB_
 
xdata::String mode_
 
unsigned int monLastN_
 
unsigned int monLastSumOfSizes_
 
uint64_t monLastSumOfSquares_
 
xdata::UnsignedInteger32 monSleepSec_
 
struct timeval monStartTime_
 
xdata::UnsignedInteger32 msgBufferSize_
 
xdata::UnsignedInteger32 nbEventsBuilt_
 
xdata::UnsignedInteger32 nbEventsDiscarded_
 
xdata::UnsignedInteger32 nbEventsInBU_
 
xdata::UnsignedInteger32 nbEventsRequested_
 
xdata::UnsignedInteger32 nbEventsSent_
 
xdata::Boolean overwriteEvtId_
 
xdata::Boolean overwriteLsId_
 
xdata::UnsignedInteger32 queueSize_
 
xdata::Double rate_
 
xdata::Boolean replay_
 
xdata::Double rms_
 
std::queue< unsigned int > rqstIds_
 
sem_t rqstSem_
 
xdata::UnsignedInteger32 runNumber_
 
sem_t sendSem_
 
std::set< unsigned int > sentIds_
 
std::string sourceId_
 
unsigned int sumOfSizes_
 
uint64_t sumOfSquares_
 
xdata::Double throughput_
 
xdata::String url_
 
xdata::Boolean useFixedFedSize_
 
std::vector< unsigned int > validFedIds_
 
toolbox::task::WorkLoop * wlBuilding_
 
toolbox::task::WorkLoop * wlMonitoring_
 
toolbox::task::WorkLoop * wlSending_
 

Static Private Attributes

static const int fedHeaderSize_ =sizeof(fedh_t)
 
static const int fedTrailerSize_ =sizeof(fedt_t)
 
static const int frlHeaderSize_ =sizeof(frlh_t)
 

Detailed Description

Definition at line 49 of file BU.h.

Constructor & Destructor Documentation

BU::BU ( xdaq::ApplicationStub *  s)

Definition at line 36 of file BU.cc.

References class_, crc_, customWebPage(), alignCSCRings::e, edm::hlt::Exception, exportParameters(), fedSizeMean_, fedSizeWidth_, evf::StateMachine::findRcmsStateListener(), fsm_, gaussianMean_, gaussianWidth_, gui_, hostname_, I2O_BU_ALLOCATE_Callback(), I2O_BU_DISCARD_Callback(), i2oPool_, evf::StateMachine::initialize(), instance_, create_public_lumi_plots::log, log_, mergeVDriftHistosByStation::name, evf::BUEvent::setComputeCrc(), evf::WebGUI::setLargeAppIcon(), evf::WebGUI::setSmallAppIcon(), sourceId_, mathSSE::sqrt(), startMonitoringWorkLoop(), url_, and webPageRequest().

37  : xdaq::Application(s)
38  , log_(getApplicationLogger())
39  , buAppDesc_(getApplicationDescriptor())
40  , fuAppDesc_(0)
41  , buAppContext_(getApplicationContext())
42  , fsm_(this)
43  , gui_(0)
44  , evtNumber_(0)
45  , isBuilding_(false)
46  , isSending_(false)
47  , isHalting_(false)
48  , wlBuilding_(0)
49  , asBuilding_(0)
50  , wlSending_(0)
51  , asSending_(0)
52  , wlMonitoring_(0)
53  , asMonitoring_(0)
54  , instance_(0)
55  , runNumber_(0)
56  , memUsedInMB_(0.0)
57  , deltaT_(0.0)
58  , deltaN_(0)
60  , deltaSumOfSizes_(0)
61  , throughput_(0.0)
62  , average_(0.0)
63  , rate_(0.0)
64  , rms_(0.0)
65  , nbEventsInBU_(0)
67  , nbEventsBuilt_(0)
68  , nbEventsSent_(0)
70  , mode_("RANDOM")
71  , replay_(false)
72  , crc_(true)
73  , overwriteEvtId_(true)
74  , overwriteLsId_(false)
75  , fakeLsUpdateSecs_(23)
76  , firstEvent_(1)
77  , queueSize_(32)
78  , eventBufferSize_(0x400000)
79  , msgBufferSize_(32768)
80  , fedSizeMax_(65536)
81  , fedSizeMean_(1024)
82  , fedSizeWidth_(1024)
83  , useFixedFedSize_(false)
84  , monSleepSec_(1)
85  , fakeLs_(0)
86  , gaussianMean_(0.0)
87  , gaussianWidth_(1.0)
88  , monLastN_(0)
91  , sumOfSquares_(0)
92  , sumOfSizes_(0)
93  , i2oPool_(0)
94 {
95  // initialize state machine
96  fsm_.initialize<evf::BU>(this);
97 
98  // initialize application info
99  url_ =
100  getApplicationDescriptor()->getContextDescriptor()->getURL()+"/"+
101  getApplicationDescriptor()->getURN();
102  class_ =getApplicationDescriptor()->getClassName();
103  instance_=getApplicationDescriptor()->getInstance();
104  hostname_=getApplicationDescriptor()->getContextDescriptor()->getURL();
105  sourceId_=class_.toString()+instance_.toString();
106 
107  // i2o callbacks
108  i2o::bind(this,&BU::I2O_BU_ALLOCATE_Callback,I2O_BU_ALLOCATE,XDAQ_ORGANIZATION_ID);
109  i2o::bind(this,&BU::I2O_BU_DISCARD_Callback, I2O_BU_DISCARD, XDAQ_ORGANIZATION_ID);
110 
111  // allocate i2o memery pool
112  string i2oPoolName=sourceId_+"_i2oPool";
113  try {
114  toolbox::mem::HeapAllocator *allocator=new toolbox::mem::HeapAllocator();
115  toolbox::net::URN urn("toolbox-mem-pool",i2oPoolName);
116  toolbox::mem::MemoryPoolFactory* poolFactory=
117  toolbox::mem::getMemoryPoolFactory();
118  i2oPool_=poolFactory->createPool(urn,allocator);
119  }
121  string s="Failed to create pool: "+i2oPoolName;
122  LOG4CPLUS_FATAL(log_,s);
123  XCEPT_RETHROW(xcept::Exception,s,e);
124  }
125 
126  // web interface
127  xgi::bind(this,&evf::BU::webPageRequest,"Default");
128  gui_=new WebGUI(this,&fsm_);
129  gui_->setSmallAppIcon("/rubuilder/bu/images/bu32x32.gif");
130  gui_->setLargeAppIcon("/rubuilder/bu/images/bu64x64.gif");
131 
132  vector<toolbox::lang::Method*> methods=gui_->getMethods();
133  vector<toolbox::lang::Method*>::iterator it;
134  for (it=methods.begin();it!=methods.end();++it) {
135  if ((*it)->type()=="cgi") {
136  string name=static_cast<xgi::MethodSignature*>(*it)->name();
137  xgi::bind(this,&evf::BU::webPageRequest,name);
138  }
139  }
140  xgi::bind(this,&evf::BU::customWebPage,"customWebPage");
141 
142 
143  // export parameters to info space(s)
145 
146  // findRcmsStateListener
148 
149  // compute parameters for fed size generation (a la Emilio)
152  (0.5*
153  (1+std::sqrt
154  (1.0+4.0*
155  fedSizeWidth_.value_*fedSizeWidth_.value_/
156  fedSizeMean_.value_/fedSizeMean_.value_))));
157 
158  // start monitoring thread, once and for all
160 
161  // propagate crc flag to BUEvent
163 }
void setLargeAppIcon(CString_t &icon)
Definition: WebGUI.h:74
xdata::UnsignedInteger32 nbEventsInBU_
Definition: BU.h:201
void webPageRequest(xgi::Input *in, xgi::Output *out)
Definition: BU.cc:402
xdata::Boolean overwriteEvtId_
Definition: BU.h:211
toolbox::task::ActionSignature * asMonitoring_
Definition: BU.h:177
xdata::Double memUsedInMB_
Definition: BU.h:188
static void setComputeCrc(bool computeCrc)
Definition: BUEvent.h:37
xdata::Boolean replay_
Definition: BU.h:209
double gaussianWidth_
Definition: BU.h:228
xdata::String mode_
Definition: BU.h:208
xdata::UnsignedInteger32 msgBufferSize_
Definition: BU.h:217
xdata::String class_
Definition: BU.h:184
StateMachine fsm_
Definition: BU.h:149
xdaq::ApplicationContext * buAppContext_
Definition: BU.h:146
xdata::Boolean useFixedFedSize_
Definition: BU.h:221
Logger log_
Definition: BU.h:137
xdata::UnsignedInteger32 fedSizeWidth_
Definition: BU.h:220
bool isSending_
Definition: BU.h:164
void setSmallAppIcon(CString_t &icon)
Definition: WebGUI.h:75
xdata::Double rate_
Definition: BU.h:197
xdata::UnsignedInteger32 instance_
Definition: BU.h:185
unsigned int evtNumber_
Definition: BU.h:160
xdata::Boolean overwriteLsId_
Definition: BU.h:212
xdata::Double throughput_
Definition: BU.h:195
xdata::UnsignedInteger32 fakeLsUpdateSecs_
Definition: BU.h:213
void startMonitoringWorkLoop()
Definition: BU.cc:539
void initialize(T *app)
Definition: StateMachine.h:84
void customWebPage(xgi::Input *in, xgi::Output *out)
Definition: BU.cc:412
xdata::Double deltaSumOfSquares_
Definition: BU.h:192
WebGUI * gui_
Definition: BU.h:152
unsigned int sumOfSizes_
Definition: BU.h:236
toolbox::task::ActionSignature * asBuilding_
Definition: BU.h:169
toolbox::task::WorkLoop * wlSending_
Definition: BU.h:172
xdata::UnsignedInteger32 nbEventsSent_
Definition: BU.h:204
void I2O_BU_ALLOCATE_Callback(toolbox::mem::Reference *bufRef)
Definition: BU.cc:319
xdata::UnsignedInteger32 nbEventsRequested_
Definition: BU.h:202
xdata::Double rms_
Definition: BU.h:198
Definition: BU.h:49
T sqrt(T t)
Definition: SSEVec.h:46
uint64_t sumOfSquares_
Definition: BU.h:235
std::string sourceId_
Definition: BU.h:180
toolbox::task::WorkLoop * wlBuilding_
Definition: BU.h:168
bool isBuilding_
Definition: BU.h:163
xdata::Boolean crc_
Definition: BU.h:210
xdaq::ApplicationDescriptor * buAppDesc_
Definition: BU.h:140
xdata::Double deltaT_
Definition: BU.h:190
double gaussianMean_
Definition: BU.h:227
bool isHalting_
Definition: BU.h:165
xdata::UnsignedInteger32 monSleepSec_
Definition: BU.h:222
toolbox::task::ActionSignature * asSending_
Definition: BU.h:173
xdata::UnsignedInteger32 deltaN_
Definition: BU.h:191
toolbox::task::WorkLoop * wlMonitoring_
Definition: BU.h:176
xdata::String hostname_
Definition: BU.h:186
xdata::UnsignedInteger32 fedSizeMax_
Definition: BU.h:218
void exportParameters()
Definition: BU.cc:629
xdata::Double average_
Definition: BU.h:196
toolbox::mem::Pool * i2oPool_
Definition: BU.h:240
xdata::UnsignedInteger32 deltaSumOfSizes_
Definition: BU.h:193
xdata::UnsignedInteger32 runNumber_
Definition: BU.h:187
xdata::UnsignedInteger32 eventBufferSize_
Definition: BU.h:216
xdata::UnsignedInteger32 fedSizeMean_
Definition: BU.h:219
void I2O_BU_DISCARD_Callback(toolbox::mem::Reference *bufRef)
Definition: BU.cc:353
xdaq::ApplicationDescriptor * fuAppDesc_
Definition: BU.h:143
unsigned int monLastN_
Definition: BU.h:232
unsigned int monLastSumOfSizes_
Definition: BU.h:234
uint64_t monLastSumOfSquares_
Definition: BU.h:233
xdata::UnsignedInteger32 nbEventsDiscarded_
Definition: BU.h:205
xdata::UnsignedInteger32 queueSize_
Definition: BU.h:215
unsigned int fakeLs_
Definition: BU.h:224
xdata::UnsignedInteger32 nbEventsBuilt_
Definition: BU.h:203
xdata::String url_
Definition: BU.h:183
void findRcmsStateListener()
xdata::UnsignedInteger32 firstEvent_
Definition: BU.h:214
BU::~BU ( )
virtual

Definition at line 167 of file BU.cc.

References events_.

168 {
169  while (!events_.empty()) { delete events_.back(); events_.pop_back(); }
170 }
std::vector< evf::BUEvent * > events_
Definition: BU.h:155

Member Function Documentation

void BU::actionPerformed ( xdata::Event &  e)

Definition at line 385 of file BU.cc.

References crc_, alignCSCRings::e, gui_, i2oPool_, PlaybackRawDataProvider::instance(), memUsedInMB_, mode_, evf::WebGUI::monInfoSpace(), and evf::BUEvent::setComputeCrc().

386 {
387  gui_->monInfoSpace()->lock();
388  if (e.type()=="urn:xdata-event:ItemGroupRetrieveEvent") {
389  mode_=(0==PlaybackRawDataProvider::instance())?"RANDOM":"PLAYBACK";
390  if (0!=i2oPool_) memUsedInMB_=i2oPool_->getMemoryUsage().getUsed()*9.53674e-07;
391  else memUsedInMB_=0.0;
392  }
393  else if (e.type()=="ItemChangedEvent") {
394  string item=dynamic_cast<xdata::ItemChangedEvent&>(e).itemName();
395  if (item=="crc") BUEvent::setComputeCrc(crc_.value_);
396  }
397  gui_->monInfoSpace()->unlock();
398 }
xdata::Double memUsedInMB_
Definition: BU.h:188
static void setComputeCrc(bool computeCrc)
Definition: BUEvent.h:37
xdata::String mode_
Definition: BU.h:208
WebGUI * gui_
Definition: BU.h:152
static PlaybackRawDataProvider * instance()
xdata::Boolean crc_
Definition: BU.h:210
xdata::InfoSpace * monInfoSpace()
Definition: WebGUI.h:72
toolbox::mem::Pool * i2oPool_
Definition: BU.h:240
bool BU::building ( toolbox::task::WorkLoop *  wl)

Definition at line 441 of file BU.cc.

References builtIds_, events_, freeIds_, generateEvent(), isBuilding_, isHalting_, lock(), log_, nbEventsBuilt_, postSend(), unlock(), and waitBuild().

Referenced by startBuildingWorkLoop().

442 {
443  waitBuild();
444  lock();
445  unsigned int buResourceId=freeIds_.front(); freeIds_.pop();
446  unlock();
447 
448  if (buResourceId>=(uint32_t)events_.size()) {
449  LOG4CPLUS_INFO(log_,"shutdown 'building' workloop.");
450  isBuilding_=false;
451  return false;
452  }
453 
454  if (!isHalting_) {
455  BUEvent* evt=events_[buResourceId];
456  if(generateEvent(evt)) {
457  lock();
458  nbEventsBuilt_++;
459  builtIds_.push(buResourceId);
460  unlock();
461 
462  postSend();
463  }
464  else {
465  LOG4CPLUS_INFO(log_,"building:received null post");
466  lock();
467  unsigned int saveBUResourceId = buResourceId;
468  //buResourceId = freeIds_.front(); freeIds_.pop();
469  freeIds_.push(saveBUResourceId);
470  unlock();
471  isBuilding_=false;
472  return false;
473  }
474  }
475  return true;
476 }
void unlock()
Definition: BU.h:111
std::queue< unsigned int > builtIds_
Definition: BU.h:158
Logger log_
Definition: BU.h:137
void lock()
Definition: BU.h:110
bool isBuilding_
Definition: BU.h:163
bool isHalting_
Definition: BU.h:165
bool generateEvent(evf::BUEvent *evt)
Definition: BU.cc:747
void waitBuild()
Definition: BU.h:112
std::vector< evf::BUEvent * > events_
Definition: BU.h:155
std::queue< unsigned int > freeIds_
Definition: BU.h:157
void postSend()
Definition: BU.h:115
xdata::UnsignedInteger32 nbEventsBuilt_
Definition: BU.h:203
bool BU::configuring ( toolbox::task::WorkLoop *  wl)

Definition at line 178 of file BU.cc.

References alignCSCRings::e, edm::hlt::Exception, evf::StateMachine::fireEvent(), evf::StateMachine::fireFailed(), fsm_, isHalting_, log_, lumiQueryAPI::msg, and reset().

179 {
180  isHalting_=false;
181  try {
182  LOG4CPLUS_INFO(log_,"Start configuring ...");
183  reset();
184  LOG4CPLUS_INFO(log_,"Finished configuring!");
185  fsm_.fireEvent("ConfigureDone",this);
186  }
187  catch (xcept::Exception &e) {
188  string msg = "configuring FAILED: " + (string)e.what();
189  fsm_.fireFailed(msg,this);
190  }
191 
192  return false;
193 }
StateMachine fsm_
Definition: BU.h:149
Logger log_
Definition: BU.h:137
void fireFailed(const std::string &errorMsg, void *originator)
void reset()
Definition: BU.cc:685
bool isHalting_
Definition: BU.h:165
void fireEvent(const std::string &evtType, void *originator)
toolbox::mem::Reference * BU::createMsgChain ( evf::BUEvent evt,
unsigned int  fuResourceId 
)
private

remainder>0 means that a partial fed is left over from the last block

no remaining fed data

Definition at line 805 of file BU.cc.

References Association::block, buAppDesc_, evf::BUEvent::buResourceId(), alignCSCRings::e, evf::evtn::evm_board_sense(), evf::evtn::EVM_TCS_LSBLNR_OFFSET, evf::BUEvent::evtNumber(), edm::hlt::Exception, fakeLs_, fakeLsUpdateSecs_, evf::BUEvent::fedAddr(), fedHeaderSize_, evf::BUEvent::fedId(), evf::BUEvent::fedSize(), fedTrailerSize_, frlHeaderSize_, fuAppDesc_, evf::evtn::GTPE_ORBTNR_OFFSET, i, i2oPool_, gen::k, prof2calltree::last, lastLsUpdate_, log_, FEDNumbering::MINTriggerEGTPFEDID, FEDNumbering::MINTriggerGTPFEDID, msgBufferSize_, GetRecoTauVFromDQM_MC_cff::next, evf::BUEvent::nFed(), overwriteLsId_, evf::evtn::SLINK_HALFWORD_SIZE, validFedIds_, and warning.

Referenced by sending().

807 {
808  unsigned int msgHeaderSize =sizeof(I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME);
809  unsigned int msgPayloadSize=msgBufferSize_-msgHeaderSize;
810 
811  if((msgPayloadSize%4)!=0) LOG4CPLUS_ERROR(log_,"Invalid Payload Size.");
812 
813  /*Overwrite lumisection value stored in the event*/
814  if (overwriteLsId_.value_) {
815  //getting new time and increase LS if past 23 sec
816  struct timezone tz;
817  if (!fakeLs_) {
818  fakeLs_++;
819  gettimeofday(&lastLsUpdate_,&tz);
820  }
821  else {
822  timeval newLsUpdate;
823  gettimeofday(&newLsUpdate,&tz);
824  if ((unsigned long)1000000*newLsUpdate.tv_sec+newLsUpdate.tv_usec
825  - (unsigned long)1000000*lastLsUpdate_.tv_sec+lastLsUpdate_.tv_usec
826  >= fakeLsUpdateSecs_.value_*1000000)
827  {
828  fakeLs_++;
829  lastLsUpdate_=newLsUpdate;
830  }
831  }
832 
833  int gtpFedPos_=-1;
834  int egtpFedPos_=-1;
835  for (size_t k=0;k<validFedIds_.size();k++) {
837  //insert ls value into gtp fed
838  unsigned char * fgtpAddr = evt->fedAddr(k);
839  unsigned int fgtpSize = evt->fedSize(k);
840  if (fgtpAddr && fgtpSize) {
841  gtpFedPos_=(int)k;
842  evtn::evm_board_sense(fgtpAddr,fgtpSize);
843  *((unsigned short*)fgtpAddr
844  +sizeof(fedh_t)/sizeof(unsigned short)
845  + (evtn::EVM_GTFE_BLOCK*2 + evtn::EVM_TCS_LSBLNR_OFFSET)*evtn::SLINK_HALFWORD_SIZE /sizeof(unsigned short)
846  ) = (unsigned short)fakeLs_-1;
847  }
848  }
850  //insert orbit value into gtpe fed
851  unsigned char * fegtpAddr = evt->fedAddr(egtpFedPos_);
852  unsigned int fegtpSize = evt->fedSize(egtpFedPos_);
853  if (fegtpAddr && fegtpSize) {
854  egtpFedPos_=(int)k;
855  *( (unsigned int*)fegtpAddr + evtn::GTPE_ORBTNR_OFFSET * evtn::SLINK_HALFWORD_SIZE/sizeof(unsigned int)
856  ) = (unsigned int)(fakeLs_-1)*0x00100000;
857  }
858  }
859  }
860  if (gtpFedPos_<0) LOG4CPLUS_ERROR(log_,"Unable to find GTP FED in event!");
861  if (egtpFedPos_<0 && gtpFedPos_<0) LOG4CPLUS_ERROR(log_,"Unable to find GTP or GTPE FED in event!");
862  }
863 
864  toolbox::mem::Reference *head =0;
865  toolbox::mem::Reference *tail =0;
866  toolbox::mem::Reference *bufRef=0;
867 
868  I2O_MESSAGE_FRAME *stdMsg=0;
869  I2O_PRIVATE_MESSAGE_FRAME *pvtMsg=0;
870  I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME *block =0;
871 
872  unsigned int iFed =0;
873  unsigned int nSuperFrag =64;
874  unsigned int nFedPerSuperFrag=validFedIds_.size()/nSuperFrag;
875  unsigned int nBigSuperFrags =validFedIds_.size()%nSuperFrag;
876 
877  if (evt->nFed()<nSuperFrag) {
878  nSuperFrag=evt->nFed();
879  nFedPerSuperFrag=1;
880  nBigSuperFrags=0;
881  }
882  else
883  {
884  nFedPerSuperFrag=evt->nFed()/nSuperFrag;
885  nBigSuperFrags =evt->nFed()%nSuperFrag;
886  }
887  // loop over all super fragments
888  for (unsigned int iSuperFrag=0;iSuperFrag<nSuperFrag;iSuperFrag++) {
889 
890  // compute index of last fed in this super fragment
891  unsigned int nFed=iFed+nFedPerSuperFrag;
892  if (iSuperFrag<nBigSuperFrags) ++nFed;
893 
894  // compute number of blocks in this super fragment
895  unsigned int nBlock =0;
896  unsigned int curbSize=frlHeaderSize_;
897  unsigned int totSize =curbSize;
898  for (unsigned int i=iFed;i<nFed;i++) {
899  curbSize+=evt->fedSize(i);
900  totSize+=evt->fedSize(i);
901  if (curbSize>msgPayloadSize) {
902  curbSize+=frlHeaderSize_*(curbSize/msgPayloadSize);
903  if(curbSize%msgPayloadSize)totSize+=frlHeaderSize_*(curbSize/msgPayloadSize);
904  else totSize+=frlHeaderSize_*((curbSize/msgPayloadSize)-1);
905  curbSize=curbSize%msgPayloadSize;
906  }
907  }
908  nBlock=totSize/msgPayloadSize+(totSize%msgPayloadSize>0 ? 1 : 0);
909 
910 
911  // loop over all blocks (msgs) in the current super fragment
912  unsigned int remainder =0;
913  bool fedTrailerLeft=false;
914  bool last =false;
915  bool warning =false;
916  unsigned char *startOfPayload=0;
917  U32 payload(0);
918 
919  for(unsigned int iBlock=0;iBlock<nBlock;iBlock++) {
920 
921  // If last block and its partial (there can be only 0 or 1 partial)
922  payload=msgPayloadSize;
923 
924  // Allocate memory for a fragment block / message
925  try {
926  bufRef=toolbox::mem::getMemoryPoolFactory()->getFrame(i2oPool_,
928  }
929  catch(xcept::Exception &e) {
930  LOG4CPLUS_FATAL(log_,"xdaq::frameAlloc failed");
931  }
932 
933  // Fill in the fields of the fragment block / message
934  stdMsg=(I2O_MESSAGE_FRAME*)bufRef->getDataLocation();
935  pvtMsg=(I2O_PRIVATE_MESSAGE_FRAME*)stdMsg;
936  block =(I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME*)stdMsg;
937 
938  pvtMsg->XFunctionCode =I2O_FU_TAKE;
939  pvtMsg->OrganizationID =XDAQ_ORGANIZATION_ID;
940 
941  stdMsg->MessageSize =(msgHeaderSize + payload) >> 2;
942  stdMsg->Function =I2O_PRIVATE_MESSAGE;
943  stdMsg->VersionOffset =0;
944  stdMsg->MsgFlags =0;
945  stdMsg->InitiatorAddress=i2o::utils::getAddressMap()->getTid(buAppDesc_);
946  stdMsg->TargetAddress =i2o::utils::getAddressMap()->getTid(fuAppDesc_);
947 
948  block->buResourceId =evt->buResourceId();
949  block->fuTransactionId =fuResourceId;
950  block->blockNb =iBlock;
951  block->nbBlocksInSuperFragment=nBlock;
952  block->superFragmentNb =iSuperFrag;
953  block->nbSuperFragmentsInEvent=nSuperFrag;
954  block->eventNumber =evt->evtNumber();
955 
956  // Fill in payload
957  startOfPayload =(unsigned char*)block+msgHeaderSize;
958  frlh_t* frlHeader=(frlh_t*)startOfPayload;
959  frlHeader->trigno=evt->evtNumber();
960  frlHeader->segno =iBlock;
961 
962  unsigned char *startOfFedBlocks=startOfPayload+frlHeaderSize_;
963  payload -=frlHeaderSize_;
964  frlHeader->segsize =payload;
965  unsigned int leftspace=payload;
966 
967  // a fed trailer was left over from the previous block
968  if(fedTrailerLeft) {
969  memcpy(startOfFedBlocks,
970  evt->fedAddr(iFed)+evt->fedSize(iFed)-fedTrailerSize_,
972 
973  startOfFedBlocks+=fedTrailerSize_;
974  leftspace -=fedTrailerSize_;
975  remainder =0;
976  fedTrailerLeft =false;
977 
978  // if this is the last fed, adjust block (msg) size and set last=true
979  if((iFed==nFed-1) && !last) {
980  frlHeader->segsize-=leftspace;
981  int msgSize=stdMsg->MessageSize << 2;
982  msgSize -=leftspace;
983  bufRef->setDataSize(msgSize);
984  stdMsg->MessageSize = msgSize >> 2;
985  frlHeader->segsize=frlHeader->segsize | FRL_LAST_SEGM;
986  last=true;
987  }
988 
989  // !! increment iFed !!
990  iFed++;
991  }
992 
996  if (remainder>0) {
997 
998  // the remaining fed fits entirely into the new block
999  if(payload>=remainder) {
1000  memcpy(startOfFedBlocks,
1001  evt->fedAddr(iFed)+evt->fedSize(iFed)-remainder,
1002  remainder);
1003 
1004  startOfFedBlocks+=remainder;
1005  leftspace -=remainder;
1006 
1007  // if this is the last fed in the superfragment, earmark it
1008  if(iFed==nFed-1) {
1009  frlHeader->segsize-=leftspace;
1010  int msgSize=stdMsg->MessageSize << 2;
1011  msgSize -=leftspace;
1012  bufRef->setDataSize(msgSize);
1013  stdMsg->MessageSize = msgSize >> 2;
1014  frlHeader->segsize=frlHeader->segsize | FRL_LAST_SEGM;
1015  last=true;
1016  }
1017 
1018  // !! increment iFed !!
1019  iFed++;
1020 
1021  // start new fed -> set remainder to 0!
1022  remainder=0;
1023  }
1024  // the remaining payload fits, but not the fed trailer
1025  else if (payload>=(remainder-fedTrailerSize_)) {
1026  memcpy(startOfFedBlocks,
1027  evt->fedAddr(iFed)+evt->fedSize(iFed)-remainder,
1028  remainder-fedTrailerSize_);
1029 
1030  frlHeader->segsize=remainder-fedTrailerSize_;
1031  fedTrailerLeft =true;
1032  leftspace -=(remainder-fedTrailerSize_);
1033  remainder =fedTrailerSize_;
1034  }
1035  // the remaining payload fits only partially, fill whole block
1036  else {
1037  memcpy(startOfFedBlocks,
1038  evt->fedAddr(iFed)+evt->fedSize(iFed)-remainder,payload);
1039  remainder-=payload;
1040  leftspace =0;
1041  }
1042  }
1043 
1047  if(remainder==0) {
1048 
1049  // loop on feds
1050  while(iFed<nFed) {
1051 
1052  // if the next header does not fit, jump to following block
1053  if((int)leftspace<fedHeaderSize_) {
1054  frlHeader->segsize-=leftspace;
1055  break;
1056  }
1057 
1058  memcpy(startOfFedBlocks,evt->fedAddr(iFed),fedHeaderSize_);
1059 
1060  leftspace -=fedHeaderSize_;
1061  startOfFedBlocks+=fedHeaderSize_;
1062 
1063  // fed fits with its trailer
1064  if(evt->fedSize(iFed)-fedHeaderSize_<=leftspace) {
1065  memcpy(startOfFedBlocks,
1066  evt->fedAddr(iFed)+fedHeaderSize_,
1067  evt->fedSize(iFed)-fedHeaderSize_);
1068 
1069  leftspace -=(evt->fedSize(iFed)-fedHeaderSize_);
1070  startOfFedBlocks+=(evt->fedSize(iFed)-fedHeaderSize_);
1071  }
1072  // fed payload fits only without fed trailer
1073  else if(evt->fedSize(iFed)-fedHeaderSize_-fedTrailerSize_<=leftspace) {
1074  memcpy(startOfFedBlocks,
1075  evt->fedAddr(iFed)+fedHeaderSize_,
1077 
1078  leftspace -=(evt->fedSize(iFed)-fedHeaderSize_-fedTrailerSize_);
1079  frlHeader->segsize-=leftspace;
1080  fedTrailerLeft =true;
1081  remainder =fedTrailerSize_;
1082 
1083  break;
1084  }
1085  // fed payload fits only partially
1086  else {
1087  memcpy(startOfFedBlocks,evt->fedAddr(iFed)+fedHeaderSize_,leftspace);
1088  remainder=evt->fedSize(iFed)-fedHeaderSize_-leftspace;
1089  leftspace=0;
1090 
1091  break;
1092  }
1093 
1094  // !! increase iFed !!
1095  iFed++;
1096 
1097  } // while (iFed<fedN_)
1098 
1099  // earmark the last block
1100  if (iFed==nFed && remainder==0 && !last) {
1101  frlHeader->segsize-=leftspace;
1102  int msgSize=stdMsg->MessageSize << 2;
1103  msgSize -=leftspace;
1104  bufRef->setDataSize(msgSize);
1105  stdMsg->MessageSize=msgSize >> 2;
1106  frlHeader->segsize =frlHeader->segsize | FRL_LAST_SEGM;
1107  last=true;
1108  }
1109 
1110  } // if (remainder==0)
1111 
1112  if(iSuperFrag==0&&iBlock==0) { // This is the first fragment block / message
1113  head=bufRef;
1114  tail=bufRef;
1115  }
1116  else {
1117  tail->setNextReference(bufRef);
1118  tail=bufRef;
1119  }
1120 
1121  if((iBlock==nBlock-1) && remainder!=0) {
1122  nBlock++;
1123  warning=true;
1124  }
1125 
1126  } // for (iBlock)
1127 
1128  // fix case where block estimate was wrong
1129  if(warning) {
1130  toolbox::mem::Reference* next=head;
1131  do {
1132  block =(I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME*)next->getDataLocation();
1133  if (block->superFragmentNb==iSuperFrag)
1134  block->nbBlocksInSuperFragment=nBlock;
1135  } while((next=next->getNextReference()));
1136  }
1137 
1138  } // iSuperFrag < nSuperFrag
1139 
1140  return head; // return the top of the chain
1141 }
int i
Definition: DBlmapReader.cc:9
const unsigned int EVM_TCS_LSBLNR_OFFSET
unsigned int evtNumber() const
Definition: BUEvent.h:28
xdata::UnsignedInteger32 msgBufferSize_
Definition: BU.h:217
unsigned int buResourceId() const
Definition: BUEvent.h:27
Logger log_
Definition: BU.h:137
static const int frlHeaderSize_
Definition: BU.h:252
xdata::Boolean overwriteLsId_
Definition: BU.h:212
xdata::UnsignedInteger32 fakeLsUpdateSecs_
Definition: BU.h:213
const unsigned int SLINK_HALFWORD_SIZE
Definition: FEDConstants.h:7
static const int fedTrailerSize_
Definition: BU.h:254
unsigned char * fedAddr(unsigned int i) const
Definition: BUEvent.cc:146
xdaq::ApplicationDescriptor * buAppDesc_
Definition: BU.h:140
unsigned int fedId(unsigned int i) const
Definition: BUEvent.h:32
block
Formating index page&#39;s pieces.
Definition: Association.py:232
bool evm_board_sense(const unsigned char *p, size_t size)
int k[5][pyjets_maxn]
std::vector< unsigned int > validFedIds_
Definition: BU.h:161
toolbox::mem::Pool * i2oPool_
Definition: BU.h:240
unsigned int fedSize(unsigned int i) const
Definition: BUEvent.h:33
xdaq::ApplicationDescriptor * fuAppDesc_
Definition: BU.h:143
timeval lastLsUpdate_
Definition: BU.h:225
static const int fedHeaderSize_
Definition: BU.h:253
unsigned int nFed() const
Definition: BUEvent.h:31
unsigned int fakeLs_
Definition: BU.h:224
const unsigned int GTPE_ORBTNR_OFFSET
void BU::customWebPage ( xgi::Input in,
xgi::Output out 
)
throw (xgi::exception::Exception
)

Definition at line 412 of file BU.cc.

References dbtoconf::out.

Referenced by BU().

414 {
415  *out<<"<html></html>"<<endl;
416 }
tuple out
Definition: dbtoconf.py:99
double BU::deltaT ( const struct timeval *  start,
const struct timeval *  end 
)
private

Definition at line 727 of file BU.cc.

Referenced by monitoring().

728 {
729  unsigned int sec;
730  unsigned int usec;
731 
732  sec = end->tv_sec - start->tv_sec;
733 
734  if(end->tv_usec > start->tv_usec) {
735  usec = end->tv_usec - start->tv_usec;
736  }
737  else {
738  sec--;
739  usec = 1000000 - ((unsigned int )(start->tv_usec - end->tv_usec));
740  }
741 
742  return ((double)sec) + ((double)usec) / 1000000.0;
743 }
#define end
Definition: vmac.h:38
void BU::dumpFrame ( unsigned char *  data,
unsigned int  len 
)
private

Definition at line 1144 of file BU.cc.

References trackerHits::c, i, and pos.

1145 {
1146  char left1[20];
1147  char left2[20];
1148  char right1[20];
1149  char right2[20];
1150 
1151  printf("Byte 0 1 2 3 4 5 6 7\n");
1152 
1153  int c(0);
1154  int pos(0);
1155 
1156  for (unsigned int i=0;i<(len/8);i++) {
1157  int rpos(0);
1158  int off(3);
1159  for (pos=0;pos<12;pos+=3) {
1160  sprintf(&left1[pos],"%2.2x ",
1161  ((unsigned char*)data)[c+off]);
1162  sprintf(&right1[rpos],"%1c",
1163  ((data[c+off] > 32)&&(data[c+off] < 127)) ? data[c+off] : '.');
1164  sprintf (&left2[pos],"%2.2x ",
1165  ((unsigned char*)data)[c+off+4]);
1166  sprintf (&right2[rpos],"%1c",
1167  ((data[c+off+4] > 32)&&(data[c+off+4]<127)) ? data[c+off+4] : '.');
1168  rpos++;
1169  off--;
1170  }
1171  c+=8;
1172 
1173  printf ("%4d: %s%s || %s%s %p\n",
1174  c-8, left1, left2, right1, right2, &data[c-8]);
1175  }
1176 
1177  fflush(stdout);
1178 }
int i
Definition: DBlmapReader.cc:9
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:82
bool BU::enabling ( toolbox::task::WorkLoop *  wl)

Definition at line 197 of file BU.cc.

References alignCSCRings::e, edm::hlt::Exception, evf::StateMachine::fireEvent(), evf::StateMachine::fireFailed(), fsm_, i, FEDNumbering::inRange(), FEDNumbering::inRangeNoGT(), PlaybackRawDataProvider::instance(), isBuilding_, isHalting_, isSending_, log_, FEDNumbering::MAXFEDID, lumiQueryAPI::msg, startBuildingWorkLoop(), startSendingWorkLoop(), and validFedIds_.

198 {
199  isHalting_=false;
200  try {
201  LOG4CPLUS_INFO(log_,"Start enabling ...");
202  // determine valid fed ids (assumes Playback EP is already configured hence PBRDP::instance
203  // not null in case we are playing back)
205  for (unsigned int i=0;i<(unsigned int)FEDNumbering::MAXFEDID+1;i++)
206  if (FEDNumbering::inRange(i)) validFedIds_.push_back(i);
207  }
208  else{
209  for (unsigned int i=0;i<(unsigned int)FEDNumbering::MAXFEDID+1;i++)
210  if (FEDNumbering::inRangeNoGT(i)) validFedIds_.push_back(i);
211  }
214  LOG4CPLUS_INFO(log_,"Finished enabling!");
215  fsm_.fireEvent("EnableDone",this);
216  }
217  catch (xcept::Exception &e) {
218  string msg = "enabling FAILED: " + (string)e.what();
219  fsm_.fireFailed(msg,this);
220  }
221 
222  return false;
223 }
int i
Definition: DBlmapReader.cc:9
StateMachine fsm_
Definition: BU.h:149
Logger log_
Definition: BU.h:137
bool isSending_
Definition: BU.h:164
void startBuildingWorkLoop()
Definition: BU.cc:420
void fireFailed(const std::string &errorMsg, void *originator)
static PlaybackRawDataProvider * instance()
bool isBuilding_
Definition: BU.h:163
bool isHalting_
Definition: BU.h:165
void fireEvent(const std::string &evtType, void *originator)
std::vector< unsigned int > validFedIds_
Definition: BU.h:161
static bool inRange(int)
void startSendingWorkLoop()
Definition: BU.cc:480
static bool inRangeNoGT(int)
void BU::exportParameters ( )
private

Definition at line 629 of file BU.cc.

References evf::WebGUI::addItemChangedListener(), evf::WebGUI::addMonitorCounter(), evf::WebGUI::addMonitorParam(), evf::WebGUI::addStandardParam(), average_, class_, crc_, deltaN_, deltaSumOfSizes_, deltaSumOfSquares_, deltaT_, eventBufferSize_, evf::WebGUI::exportParameters(), fakeLsUpdateSecs_, fedSizeMax_, fedSizeMean_, fedSizeWidth_, firstEvent_, evf::StateMachine::foundRcmsStateListener(), fsm_, gui_, hostname_, instance_, log_, memUsedInMB_, mode_, monSleepSec_, msgBufferSize_, nbEventsBuilt_, nbEventsDiscarded_, nbEventsInBU_, nbEventsRequested_, nbEventsSent_, overwriteEvtId_, overwriteLsId_, queueSize_, rate_, evf::StateMachine::rcmsStateListener(), replay_, rms_, runNumber_, evf::StateMachine::stateName(), throughput_, url_, and useFixedFedSize_.

Referenced by BU().

630 {
631  if (0==gui_) {
632  LOG4CPLUS_ERROR(log_,"No GUI, can't export parameters");
633  return;
634  }
635 
636  gui_->addMonitorParam("url", &url_);
637  gui_->addMonitorParam("class", &class_);
638  gui_->addMonitorParam("instance", &instance_);
639  gui_->addMonitorParam("hostname", &hostname_);
640  gui_->addMonitorParam("runNumber", &runNumber_);
641  gui_->addMonitorParam("stateName", fsm_.stateName());
642  gui_->addMonitorParam("memUsedInMB", &memUsedInMB_);
643  gui_->addMonitorParam("deltaT", &deltaT_);
644  gui_->addMonitorParam("deltaN", &deltaN_);
645  gui_->addMonitorParam("deltaSumOfSquares", &deltaSumOfSquares_);
646  gui_->addMonitorParam("deltaSumOfSizes", &deltaSumOfSizes_);
647  gui_->addMonitorParam("throughput", &throughput_);
648  gui_->addMonitorParam("average", &average_);
649  gui_->addMonitorParam("rate", &rate_);
650  gui_->addMonitorParam("rms", &rms_);
651 
652  gui_->addMonitorCounter("nbEvtsInBU", &nbEventsInBU_);
653  gui_->addMonitorCounter("nbEvtsRequested", &nbEventsRequested_);
654  gui_->addMonitorCounter("nbEvtsBuilt", &nbEventsBuilt_);
655  gui_->addMonitorCounter("nbEvtsSent", &nbEventsSent_);
656  gui_->addMonitorCounter("nbEvtsDiscarded", &nbEventsDiscarded_);
657 
658  gui_->addStandardParam("mode", &mode_);
659  gui_->addStandardParam("replay", &replay_);
660  gui_->addStandardParam("overwriteEvtId", &overwriteEvtId_);
661  gui_->addStandardParam("overwriteLsId", &overwriteLsId_);
662  gui_->addStandardParam("fakeLsUpdateSecs", &fakeLsUpdateSecs_);
663  gui_->addStandardParam("crc", &crc_);
664  gui_->addStandardParam("firstEvent", &firstEvent_);
665  gui_->addStandardParam("queueSize", &queueSize_);
666  gui_->addStandardParam("eventBufferSize", &eventBufferSize_);
667  gui_->addStandardParam("msgBufferSize", &msgBufferSize_);
668  gui_->addStandardParam("fedSizeMax", &fedSizeMax_);
669  gui_->addStandardParam("fedSizeMean", &fedSizeMean_);
670  gui_->addStandardParam("fedSizeWidth", &fedSizeWidth_);
671  gui_->addStandardParam("useFixedFedSize", &useFixedFedSize_);
672  gui_->addStandardParam("monSleepSec", &monSleepSec_);
673  gui_->addStandardParam("rcmsStateListener", fsm_.rcmsStateListener());
674  gui_->addStandardParam("foundRcmsStateListener",fsm_.foundRcmsStateListener());
675 
676 
678 
679  gui_->addItemChangedListener("crc",this);
680 
681 }
xdata::UnsignedInteger32 nbEventsInBU_
Definition: BU.h:201
xdata::Boolean overwriteEvtId_
Definition: BU.h:211
xdata::Double memUsedInMB_
Definition: BU.h:188
xdata::Boolean replay_
Definition: BU.h:209
void addStandardParam(CString_t &name, Param_t *param)
Definition: WebGUI.cc:134
void exportParameters()
Definition: WebGUI.cc:200
xdata::String mode_
Definition: BU.h:208
xdata::UnsignedInteger32 msgBufferSize_
Definition: BU.h:217
xdata::String class_
Definition: BU.h:184
StateMachine fsm_
Definition: BU.h:149
xdata::Boolean useFixedFedSize_
Definition: BU.h:221
Logger log_
Definition: BU.h:137
xdata::UnsignedInteger32 fedSizeWidth_
Definition: BU.h:220
void addMonitorParam(CString_t &name, Param_t *param)
Definition: WebGUI.cc:145
void addItemChangedListener(CString_t &name, xdata::ActionListener *l)
Definition: WebGUI.cc:238
xdata::Double rate_
Definition: BU.h:197
xdata::UnsignedInteger32 instance_
Definition: BU.h:185
xdata::Boolean overwriteLsId_
Definition: BU.h:212
xdata::Double throughput_
Definition: BU.h:195
xdata::UnsignedInteger32 fakeLsUpdateSecs_
Definition: BU.h:213
xdata::Double deltaSumOfSquares_
Definition: BU.h:192
xdata::Bag< xdaq2rc::ClassnameAndInstance > * rcmsStateListener()
Definition: StateMachine.h:72
WebGUI * gui_
Definition: BU.h:152
xdata::UnsignedInteger32 nbEventsSent_
Definition: BU.h:204
xdata::UnsignedInteger32 nbEventsRequested_
Definition: BU.h:202
xdata::Double rms_
Definition: BU.h:198
xdata::Boolean crc_
Definition: BU.h:210
xdata::Double deltaT_
Definition: BU.h:190
xdata::UnsignedInteger32 monSleepSec_
Definition: BU.h:222
xdata::UnsignedInteger32 deltaN_
Definition: BU.h:191
xdata::String hostname_
Definition: BU.h:186
xdata::UnsignedInteger32 fedSizeMax_
Definition: BU.h:218
xdata::String * stateName()
Definition: StateMachine.h:69
xdata::Double average_
Definition: BU.h:196
xdata::Boolean * foundRcmsStateListener()
Definition: StateMachine.h:78
xdata::UnsignedInteger32 deltaSumOfSizes_
Definition: BU.h:193
xdata::UnsignedInteger32 runNumber_
Definition: BU.h:187
xdata::UnsignedInteger32 eventBufferSize_
Definition: BU.h:216
xdata::UnsignedInteger32 fedSizeMean_
Definition: BU.h:219
xdata::UnsignedInteger32 nbEventsDiscarded_
Definition: BU.h:205
void addMonitorCounter(CString_t &name, Counter_t *counter)
Definition: WebGUI.cc:178
xdata::UnsignedInteger32 queueSize_
Definition: BU.h:215
xdata::UnsignedInteger32 nbEventsBuilt_
Definition: BU.h:203
xdata::String url_
Definition: BU.h:183
xdata::UnsignedInteger32 firstEvent_
Definition: BU.h:214
xoap::MessageReference BU::fsmCallback ( xoap::MessageReference  msg)
throw (xoap::exception::Exception
)

Definition at line 311 of file BU.cc.

References lumiQueryAPI::msg.

313 {
314  return fsm_.commandCallback(msg);
315 }
xoap::MessageReference commandCallback(xoap::MessageReference msg)
Definition: StateMachine.cc:71
StateMachine fsm_
Definition: BU.h:149
bool BU::generateEvent ( evf::BUEvent evt)
private

Definition at line 747 of file BU.cc.

References event(), fedh_struct::eventid, events_, evtNumber_, create_public_lumi_plots::exp, fedHeaderSize_, fedSizeMax_, fedSizeMean_, fedTrailerSize_, firstEvent_, gaussianMean_, gaussianWidth_, PlaybackRawDataProvider::getFEDRawData(), i, evf::BUEvent::initialize(), PlaybackRawDataProvider::instance(), nbEventsBuilt_, overwriteEvtId_, replay_, convertSQLiteXML::runNumber, PlaybackRawDataProvider::setFreeToEof(), useFixedFedSize_, validFedIds_, evf::BUEvent::writeFed(), evf::BUEvent::writeFedHeader(), and evf::BUEvent::writeFedTrailer().

Referenced by building().

748 {
749  // replay?
750  if (replay_.value_&&nbEventsBuilt_>=(uint32_t)events_.size())
751  {
754  return true;
755  }
756  // PLAYBACK mode
758 
759  unsigned int runNumber,evtNumber;
760 
761  FEDRawDataCollection* event=
762  PlaybackRawDataProvider::instance()->getFEDRawData(runNumber,evtNumber);
763  if(event == 0) return false;
764  evt->initialize(evtNumber);
765 
766  for (unsigned int i=0;i<validFedIds_.size();i++) {
767  unsigned int fedId =validFedIds_[i];
768  unsigned int fedSize=event->FEDData(fedId).size();
769  unsigned char* fedAddr=event->FEDData(fedId).data();
770  if (overwriteEvtId_.value_ && fedAddr != 0) {
771  fedh_t *fedHeader=(fedh_t*)fedAddr;
772  fedHeader->eventid=(fedHeader->eventid&0xFF000000)+(evtNumber&0x00FFFFFF);
773  }
774  if (fedSize>0) evt->writeFed(fedId,fedAddr,fedSize);
775  }
776  delete event;
777  }
778  // RANDOM mode
779  else {
780  unsigned int evtNumber=(firstEvent_+evtNumber_++)%0x1000000;
781  evt->initialize(evtNumber);
782  unsigned int fedSizeMin=fedHeaderSize_+fedTrailerSize_;
783  for (unsigned int i=0;i<validFedIds_.size();i++) {
784  unsigned int fedId(validFedIds_[i]);
785  unsigned int fedSize(fedSizeMean_);
786  if (!useFixedFedSize_) {
787  double logFedSize=CLHEP::RandGauss::shoot(gaussianMean_,gaussianWidth_);
788  fedSize=(unsigned int)(std::exp(logFedSize));
789  if (fedSize<fedSizeMin) fedSize=fedSizeMin;
790  if (fedSize>fedSizeMax_) fedSize=fedSizeMax_;
791  fedSize-=fedSize%8;
792  }
793 
794  evt->writeFed(fedId,0,fedSize);
795  evt->writeFedHeader(i);
796  evt->writeFedTrailer(i);
797  }
798 
799  }
800  return true;
801 }
int i
Definition: DBlmapReader.cc:9
void initialize(unsigned int evtNumber)
Definition: BUEvent.cc:74
xdata::Boolean overwriteEvtId_
Definition: BU.h:211
xdata::Boolean replay_
Definition: BU.h:209
double gaussianWidth_
Definition: BU.h:228
bool writeFed(unsigned int id, unsigned char *data, unsigned int size)
Definition: BUEvent.cc:83
virtual FEDRawDataCollection * getFEDRawData()
xdata::Boolean useFixedFedSize_
Definition: BU.h:221
unsigned int evtNumber_
Definition: BU.h:160
bool writeFedHeader(unsigned int i)
Definition: BUEvent.cc:106
static const int fedTrailerSize_
Definition: BU.h:254
static PlaybackRawDataProvider * instance()
double gaussianMean_
Definition: BU.h:227
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
bool writeFedTrailer(unsigned int i)
Definition: BUEvent.cc:123
std::vector< unsigned int > validFedIds_
Definition: BU.h:161
xdata::UnsignedInteger32 fedSizeMax_
Definition: BU.h:218
unsigned int eventid
Definition: fed_header.h:33
std::vector< evf::BUEvent * > events_
Definition: BU.h:155
xdata::UnsignedInteger32 fedSizeMean_
Definition: BU.h:219
static const int fedHeaderSize_
Definition: BU.h:253
xdata::UnsignedInteger32 nbEventsBuilt_
Definition: BU.h:203
xdata::UnsignedInteger32 firstEvent_
Definition: BU.h:214
bool BU::halting ( toolbox::task::WorkLoop *  wl)

Definition at line 280 of file BU.cc.

References builtIds_, alignCSCRings::e, events_, edm::hlt::Exception, evf::StateMachine::fireEvent(), evf::StateMachine::fireFailed(), freeIds_, fsm_, PlaybackRawDataProvider::instance(), isBuilding_, isHalting_, isSending_, lock(), log_, lumiQueryAPI::msg, nbEventsBuilt_, postBuild(), postSend(), replay_, PlaybackRawDataProvider::setFreeToEof(), and unlock().

281 {
282  try {
283  LOG4CPLUS_INFO(log_,"Start halting ...");
284  isHalting_=true;
285  if (isBuilding_&&isSending_) {
286  lock();
287  freeIds_.push(events_.size());
288  builtIds_.push(events_.size());
289  unlock();
290  postBuild();
291  postSend();
292  }
294  (!replay_.value_||nbEventsBuilt_<(uint32_t)events_.size())) {
296  while (!PlaybackRawDataProvider::instance()->areFilesClosed()) usleep(1000000);
297  usleep(100000);
298  }
299  LOG4CPLUS_INFO(log_,"Finished halting!");
300  fsm_.fireEvent("HaltDone",this);
301  }
302  catch (xcept::Exception &e) {
303  string msg = "halting FAILED: " + (string)e.what();
304  fsm_.fireFailed(msg,this);
305  }
306  return false;
307 }
void unlock()
Definition: BU.h:111
xdata::Boolean replay_
Definition: BU.h:209
std::queue< unsigned int > builtIds_
Definition: BU.h:158
StateMachine fsm_
Definition: BU.h:149
Logger log_
Definition: BU.h:137
bool isSending_
Definition: BU.h:164
void postBuild()
Definition: BU.h:113
void lock()
Definition: BU.h:110
void fireFailed(const std::string &errorMsg, void *originator)
static PlaybackRawDataProvider * instance()
bool isBuilding_
Definition: BU.h:163
bool isHalting_
Definition: BU.h:165
void fireEvent(const std::string &evtType, void *originator)
std::vector< evf::BUEvent * > events_
Definition: BU.h:155
std::queue< unsigned int > freeIds_
Definition: BU.h:157
void postSend()
Definition: BU.h:115
xdata::UnsignedInteger32 nbEventsBuilt_
Definition: BU.h:203
void BU::I2O_BU_ALLOCATE_Callback ( toolbox::mem::Reference *  bufRef)

Definition at line 319 of file BU.cc.

References fuAppDesc_, i, isHalting_, lock(), log_, lumiQueryAPI::msg, nbEventsInBU_, nbEventsRequested_, postRqst(), rqstIds_, and unlock().

Referenced by BU().

320 {
321  if (isHalting_) {
322  LOG4CPLUS_WARN(log_,"Ignore BU_ALLOCATE message while halting.");
323  bufRef->release();
324  return;
325  }
326 
327  I2O_MESSAGE_FRAME *stdMsg;
328  I2O_BU_ALLOCATE_MESSAGE_FRAME *msg;
329 
330  stdMsg=(I2O_MESSAGE_FRAME*)bufRef->getDataLocation();
331  msg =(I2O_BU_ALLOCATE_MESSAGE_FRAME*)stdMsg;
332 
333  if (0==fuAppDesc_) {
334  I2O_TID fuTid=stdMsg->InitiatorAddress;
335  fuAppDesc_=i2o::utils::getAddressMap()->getApplicationDescriptor(fuTid);
336  }
337 
338  for (unsigned int i=0;i<msg->n;i++) {
339  unsigned int fuResourceId=msg->allocate[i].fuTransactionId;
340  lock();
341  rqstIds_.push(fuResourceId);
342  postRqst();
344  nbEventsInBU_++;
345  unlock();
346  }
347 
348  bufRef->release();
349 }
xdata::UnsignedInteger32 nbEventsInBU_
Definition: BU.h:201
int i
Definition: DBlmapReader.cc:9
void unlock()
Definition: BU.h:111
Logger log_
Definition: BU.h:137
std::queue< unsigned int > rqstIds_
Definition: BU.h:156
void lock()
Definition: BU.h:110
xdata::UnsignedInteger32 nbEventsRequested_
Definition: BU.h:202
bool isHalting_
Definition: BU.h:165
xdaq::ApplicationDescriptor * fuAppDesc_
Definition: BU.h:143
void postRqst()
Definition: BU.h:117
void BU::I2O_BU_DISCARD_Callback ( toolbox::mem::Reference *  bufRef)

Definition at line 353 of file BU.cc.

References freeIds_, isHalting_, lock(), log_, lumiQueryAPI::msg, nbEventsDiscarded_, postBuild(), query::result, sentIds_, and unlock().

Referenced by BU().

354 {
355  if (isHalting_) {
356  LOG4CPLUS_WARN(log_,"Ignore BU_DISCARD message while halting.");
357  bufRef->release();
358  return;
359  }
360 
361  I2O_MESSAGE_FRAME *stdMsg=(I2O_MESSAGE_FRAME*)bufRef->getDataLocation();
362  I2O_BU_DISCARD_MESSAGE_FRAME*msg =(I2O_BU_DISCARD_MESSAGE_FRAME*)stdMsg;
363  unsigned int buResourceId=msg->buResourceId[0];
364 
365  lock();
366  int result=sentIds_.erase(buResourceId);
367  unlock();
368 
369  if (!result) {
370  LOG4CPLUS_ERROR(log_,"can't discard unknown buResourceId '"<<buResourceId<<"'");
371  }
372  else {
373  lock();
374  freeIds_.push(buResourceId);
375  nbEventsDiscarded_.value_++;
376  unlock();
377  postBuild();
378  }
379 
380  bufRef->release();
381 }
void unlock()
Definition: BU.h:111
Logger log_
Definition: BU.h:137
void postBuild()
Definition: BU.h:113
void lock()
Definition: BU.h:110
tuple result
Definition: query.py:137
bool isHalting_
Definition: BU.h:165
std::queue< unsigned int > freeIds_
Definition: BU.h:157
std::set< unsigned int > sentIds_
Definition: BU.h:159
xdata::UnsignedInteger32 nbEventsDiscarded_
Definition: BU.h:205
void evf::BU::lock ( void  )
inlineprivate

Definition at line 110 of file BU.h.

References lock_.

Referenced by building(), halting(), I2O_BU_ALLOCATE_Callback(), I2O_BU_DISCARD_Callback(), monitoring(), sending(), and stopping().

110 { sem_wait(&lock_); }
sem_t lock_
Definition: BU.h:243
bool BU::monitoring ( toolbox::task::WorkLoop *  wl)

Definition at line 562 of file BU.cc.

References average_, deltaN_, deltaSumOfSizes_, deltaSumOfSquares_, deltaT(), deltaT_, gui_, lock(), timingPdfMaker::mean, evf::WebGUI::monInfoSpace(), monLastN_, monLastSumOfSizes_, monLastSumOfSquares_, monSleepSec_, monStartTime_, nbEventsBuilt_, rate_, rms_, stor::utils::sleep(), mathSSE::sqrt(), sumOfSizes_, sumOfSquares_, throughput_, and unlock().

Referenced by startMonitoringWorkLoop().

563 {
564  struct timeval monEndTime;
565  struct timezone timezone;
566 
567  gettimeofday(&monEndTime,&timezone);
568 
569  lock();
570  unsigned int monN =nbEventsBuilt_.value_;
571  uint64_t monSumOfSquares=sumOfSquares_;
572  unsigned int monSumOfSizes =sumOfSizes_;
573  uint64_t deltaSumOfSquares;
574  unlock();
575 
576  gui_->monInfoSpace()->lock();
577 
578  deltaT_.value_=deltaT(&monStartTime_,&monEndTime);
579  monStartTime_=monEndTime;
580 
581  deltaN_.value_=monN-monLastN_;
582  monLastN_=monN;
583 
584  deltaSumOfSquares=monSumOfSquares-monLastSumOfSquares_;
585  deltaSumOfSquares_.value_=(double)deltaSumOfSquares;
586  monLastSumOfSquares_=monSumOfSquares;
587 
588  deltaSumOfSizes_.value_=monSumOfSizes-monLastSumOfSizes_;
589  monLastSumOfSizes_=monSumOfSizes;
590 
591  if (deltaT_.value_!=0) {
592  throughput_=deltaSumOfSizes_.value_/deltaT_.value_;
593  rate_ =deltaN_.value_/deltaT_.value_;
594  }
595  else {
596  throughput_=0.0;
597  rate_ =0.0;
598  }
599 
600  double meanOfSquares,mean,squareOfMean,variance;
601 
602  if(deltaN_.value_!=0) {
603  meanOfSquares=deltaSumOfSquares_.value_/((double)(deltaN_.value_));
604  mean=((double)(deltaSumOfSizes_.value_))/((double)(deltaN_.value_));
605  squareOfMean=mean*mean;
606  variance=meanOfSquares-squareOfMean;
607  average_=deltaSumOfSizes_.value_/deltaN_.value_;
608  rms_ =std::sqrt(variance);
609  }
610  else {
611  average_=0.0;
612  rms_ =0.0;
613  }
614 
615  gui_->monInfoSpace()->unlock();
616 
617  ::sleep(monSleepSec_.value_);
618 
619  return true;
620 }
void unlock()
Definition: BU.h:111
double deltaT(const struct timeval *start, const struct timeval *end)
Definition: BU.cc:727
xdata::Double rate_
Definition: BU.h:197
void sleep(Duration_t)
Definition: Utils.h:163
xdata::Double throughput_
Definition: BU.h:195
void lock()
Definition: BU.h:110
xdata::Double deltaSumOfSquares_
Definition: BU.h:192
WebGUI * gui_
Definition: BU.h:152
unsigned int sumOfSizes_
Definition: BU.h:236
xdata::Double rms_
Definition: BU.h:198
T sqrt(T t)
Definition: SSEVec.h:46
uint64_t sumOfSquares_
Definition: BU.h:235
xdata::Double deltaT_
Definition: BU.h:190
xdata::UnsignedInteger32 monSleepSec_
Definition: BU.h:222
xdata::UnsignedInteger32 deltaN_
Definition: BU.h:191
unsigned long long uint64_t
Definition: Time.h:15
xdata::InfoSpace * monInfoSpace()
Definition: WebGUI.h:72
xdata::Double average_
Definition: BU.h:196
xdata::UnsignedInteger32 deltaSumOfSizes_
Definition: BU.h:193
unsigned int monLastN_
Definition: BU.h:232
unsigned int monLastSumOfSizes_
Definition: BU.h:234
uint64_t monLastSumOfSquares_
Definition: BU.h:233
xdata::UnsignedInteger32 nbEventsBuilt_
Definition: BU.h:203
struct timeval monStartTime_
Definition: BU.h:231
void evf::BU::postBuild ( )
inlineprivate

Definition at line 113 of file BU.h.

References buildSem_.

Referenced by halting(), I2O_BU_DISCARD_Callback(), and stopping().

113 { sem_post(&buildSem_); }
sem_t buildSem_
Definition: BU.h:244
void evf::BU::postRqst ( )
inlineprivate

Definition at line 117 of file BU.h.

References rqstSem_.

Referenced by I2O_BU_ALLOCATE_Callback().

117 { sem_post(&rqstSem_); }
sem_t rqstSem_
Definition: BU.h:246
void evf::BU::postSend ( )
inlineprivate

Definition at line 115 of file BU.h.

References sendSem_.

Referenced by building(), halting(), and stopping().

115 { sem_post(&sendSem_); }
sem_t sendSem_
Definition: BU.h:245
void BU::reset ( void  )
private

Definition at line 685 of file BU.cc.

References average_, buildSem_, builtIds_, deltaN_, deltaSumOfSizes_, deltaSumOfSquares_, deltaT_, eventBufferSize_, events_, fakeLs_, freeIds_, gui_, i, lock_, monLastN_, monLastSumOfSizes_, monLastSumOfSquares_, queueSize_, rate_, evf::WebGUI::resetCounters(), rms_, rqstIds_, rqstSem_, sendSem_, sentIds_, throughput_, and validFedIds_.

Referenced by configuring(), and stopping().

686 {
687  gui_->resetCounters();
688 
689  deltaT_ =0.0;
690  deltaN_ = 0;
691  deltaSumOfSquares_ = 0;
692  deltaSumOfSizes_ = 0;
693 
694  throughput_ =0.0;
695  average_ = 0;
696  rate_ = 0;
697  rms_ = 0;
698 
699  monLastN_ = 0;
701  monLastSumOfSizes_ = 0;
702 
703  while (events_.size()) {
704  delete events_.back();
705  events_.pop_back();
706  }
707 
708  while (!rqstIds_.empty()) rqstIds_.pop();
709  while (!freeIds_.empty()) freeIds_.pop();
710  while (!builtIds_.empty()) builtIds_.pop();
711  sentIds_.clear();
712 
713  sem_init(&lock_,0,1);
714  sem_init(&buildSem_,0,queueSize_);
715  sem_init(&sendSem_,0,0);
716  sem_init(&rqstSem_,0,0);
717 
718  for (unsigned int i=0;i<queueSize_;i++) {
719  events_.push_back(new BUEvent(i,eventBufferSize_));
720  freeIds_.push(i);
721  }
722  validFedIds_.clear();
723  fakeLs_=0;
724 }
int i
Definition: DBlmapReader.cc:9
void resetCounters()
Definition: WebGUI.cc:217
std::queue< unsigned int > builtIds_
Definition: BU.h:158
std::queue< unsigned int > rqstIds_
Definition: BU.h:156
xdata::Double rate_
Definition: BU.h:197
sem_t sendSem_
Definition: BU.h:245
xdata::Double throughput_
Definition: BU.h:195
xdata::Double deltaSumOfSquares_
Definition: BU.h:192
WebGUI * gui_
Definition: BU.h:152
xdata::Double rms_
Definition: BU.h:198
sem_t lock_
Definition: BU.h:243
xdata::Double deltaT_
Definition: BU.h:190
xdata::UnsignedInteger32 deltaN_
Definition: BU.h:191
std::vector< unsigned int > validFedIds_
Definition: BU.h:161
sem_t buildSem_
Definition: BU.h:244
sem_t rqstSem_
Definition: BU.h:246
xdata::Double average_
Definition: BU.h:196
xdata::UnsignedInteger32 deltaSumOfSizes_
Definition: BU.h:193
std::vector< evf::BUEvent * > events_
Definition: BU.h:155
xdata::UnsignedInteger32 eventBufferSize_
Definition: BU.h:216
std::queue< unsigned int > freeIds_
Definition: BU.h:157
unsigned int monLastN_
Definition: BU.h:232
std::set< unsigned int > sentIds_
Definition: BU.h:159
unsigned int monLastSumOfSizes_
Definition: BU.h:234
uint64_t monLastSumOfSquares_
Definition: BU.h:233
xdata::UnsignedInteger32 queueSize_
Definition: BU.h:215
unsigned int fakeLs_
Definition: BU.h:224
bool BU::sending ( toolbox::task::WorkLoop *  wl)

Definition at line 501 of file BU.cc.

References buAppContext_, buAppDesc_, builtIds_, createMsgChain(), events_, evf::BUEvent::evtSize(), fuAppDesc_, isHalting_, isSending_, lock(), log_, lumiQueryAPI::msg, nbEventsInBU_, nbEventsSent_, rqstIds_, sentIds_, sumOfSizes_, sumOfSquares_, unlock(), waitRqst(), and waitSend().

Referenced by startSendingWorkLoop().

502 {
503  waitSend();
504  lock();
505  unsigned int buResourceId=builtIds_.front(); builtIds_.pop();
506  unlock();
507 
508  if (buResourceId>=(uint32_t)events_.size()) {
509  LOG4CPLUS_INFO(log_,"shutdown 'sending' workloop.");
510  isSending_=false;
511  return false;
512  }
513 
514  if (!isHalting_) {
515  waitRqst();
516  lock();
517  unsigned int fuResourceId=rqstIds_.front(); rqstIds_.pop();
518  unlock();
519 
520  BUEvent* evt=events_[buResourceId];
521  toolbox::mem::Reference* msg=createMsgChain(evt,fuResourceId);
522 
523  lock();
524  sumOfSquares_+=(uint64_t)evt->evtSize()*(uint64_t)evt->evtSize();
525  sumOfSizes_ +=evt->evtSize();
526  nbEventsInBU_--;
527  nbEventsSent_++;
528  sentIds_.insert(buResourceId);
529  unlock();
530 
531  buAppContext_->postFrame(msg,buAppDesc_,fuAppDesc_);
532  }
533 
534  return true;
535 }
xdata::UnsignedInteger32 nbEventsInBU_
Definition: BU.h:201
void unlock()
Definition: BU.h:111
std::queue< unsigned int > builtIds_
Definition: BU.h:158
xdaq::ApplicationContext * buAppContext_
Definition: BU.h:146
toolbox::mem::Reference * createMsgChain(evf::BUEvent *evt, unsigned int fuResourceId)
Definition: BU.cc:805
Logger log_
Definition: BU.h:137
std::queue< unsigned int > rqstIds_
Definition: BU.h:156
bool isSending_
Definition: BU.h:164
void lock()
Definition: BU.h:110
void waitSend()
Definition: BU.h:114
void waitRqst()
Definition: BU.h:116
unsigned int sumOfSizes_
Definition: BU.h:236
xdata::UnsignedInteger32 nbEventsSent_
Definition: BU.h:204
unsigned int evtSize() const
Definition: BUEvent.h:29
uint64_t sumOfSquares_
Definition: BU.h:235
xdaq::ApplicationDescriptor * buAppDesc_
Definition: BU.h:140
bool isHalting_
Definition: BU.h:165
unsigned long long uint64_t
Definition: Time.h:15
std::vector< evf::BUEvent * > events_
Definition: BU.h:155
xdaq::ApplicationDescriptor * fuAppDesc_
Definition: BU.h:143
std::set< unsigned int > sentIds_
Definition: BU.h:159
void BU::startBuildingWorkLoop ( )
throw (evf::Exception
)

Definition at line 420 of file BU.cc.

References asBuilding_, building(), alignCSCRings::e, edm::hlt::Exception, isBuilding_, log_, lumiQueryAPI::msg, sourceId_, and wlBuilding_.

Referenced by enabling().

421 {
422  try {
423  LOG4CPLUS_INFO(log_,"Start 'building' workloop");
424  wlBuilding_=
425  toolbox::task::getWorkLoopFactory()->getWorkLoop(sourceId_+
426  "Building",
427  "waiting");
428  if (!wlBuilding_->isActive()) wlBuilding_->activate();
429  asBuilding_=toolbox::task::bind(this,&BU::building,sourceId_+"Building");
430  wlBuilding_->submit(asBuilding_);
431  isBuilding_=true;
432  }
433  catch (xcept::Exception& e) {
434  string msg = "Failed to start workloop 'building'.";
435  XCEPT_RETHROW(evf::Exception,msg,e);
436  }
437 }
Logger log_
Definition: BU.h:137
toolbox::task::ActionSignature * asBuilding_
Definition: BU.h:169
std::string sourceId_
Definition: BU.h:180
toolbox::task::WorkLoop * wlBuilding_
Definition: BU.h:168
bool isBuilding_
Definition: BU.h:163
bool building(toolbox::task::WorkLoop *wl)
Definition: BU.cc:441
void BU::startMonitoringWorkLoop ( )
throw (evf::Exception
)

Definition at line 539 of file BU.cc.

References asMonitoring_, alignCSCRings::e, edm::hlt::Exception, log_, monitoring(), monStartTime_, lumiQueryAPI::msg, sourceId_, and wlMonitoring_.

Referenced by BU().

540 {
541  struct timezone timezone;
542  gettimeofday(&monStartTime_,&timezone);
543 
544  try {
545  LOG4CPLUS_INFO(log_,"Start 'monitoring' workloop");
547  toolbox::task::getWorkLoopFactory()->getWorkLoop(sourceId_+
548  "Monitoring",
549  "waiting");
550  if (!wlMonitoring_->isActive()) wlMonitoring_->activate();
551  asMonitoring_=toolbox::task::bind(this,&BU::monitoring,sourceId_+"Monitoring");
552  wlMonitoring_->submit(asMonitoring_);
553  }
554  catch (xcept::Exception& e) {
555  string msg = "Failed to start workloop 'monitoring'.";
556  XCEPT_RETHROW(evf::Exception,msg,e);
557  }
558 }
toolbox::task::ActionSignature * asMonitoring_
Definition: BU.h:177
Logger log_
Definition: BU.h:137
bool monitoring(toolbox::task::WorkLoop *wl)
Definition: BU.cc:562
std::string sourceId_
Definition: BU.h:180
toolbox::task::WorkLoop * wlMonitoring_
Definition: BU.h:176
struct timeval monStartTime_
Definition: BU.h:231
void BU::startSendingWorkLoop ( )
throw (evf::Exception
)

Definition at line 480 of file BU.cc.

References asSending_, alignCSCRings::e, edm::hlt::Exception, isSending_, log_, lumiQueryAPI::msg, sending(), sourceId_, and wlSending_.

Referenced by enabling().

481 {
482  try {
483  LOG4CPLUS_INFO(log_,"Start 'sending' workloop");
484  wlSending_=toolbox::task::getWorkLoopFactory()->getWorkLoop(sourceId_+
485  "Sending",
486  "waiting");
487  if (!wlSending_->isActive()) wlSending_->activate();
488 
489  asSending_=toolbox::task::bind(this,&BU::sending,sourceId_+"Sending");
490  wlSending_->submit(asSending_);
491  isSending_=true;
492  }
493  catch (xcept::Exception& e) {
494  string msg = "Failed to start workloop 'sending'.";
495  XCEPT_RETHROW(evf::Exception,msg,e);
496  }
497 }
bool sending(toolbox::task::WorkLoop *wl)
Definition: BU.cc:501
Logger log_
Definition: BU.h:137
bool isSending_
Definition: BU.h:164
toolbox::task::WorkLoop * wlSending_
Definition: BU.h:172
std::string sourceId_
Definition: BU.h:180
toolbox::task::ActionSignature * asSending_
Definition: BU.h:173
bool BU::stopping ( toolbox::task::WorkLoop *  wl)

Definition at line 227 of file BU.cc.

References builtIds_, alignCSCRings::e, events_, edm::hlt::Exception, evf::StateMachine::fireEvent(), evf::StateMachine::fireFailed(), freeIds_, fsm_, PlaybackRawDataProvider::instance(), lock(), log_, lumiQueryAPI::msg, postBuild(), postSend(), reset(), sentIds_, PlaybackRawDataProvider::setFreeToEof(), stor::utils::sleep(), and unlock().

228 {
229  try {
230  LOG4CPLUS_INFO(log_,"Start stopping :) ...");
231 
232  if (0!=PlaybackRawDataProvider::instance()) { /*&&
233  (!replay_.value_||nbEventsBuilt_<(uint32_t)events_.size())) { */
234  lock();
235  freeIds_.push(events_.size());
236  unlock();
237  postBuild();
238  while (!builtIds_.empty()) {
239  LOG4CPLUS_INFO(log_,"wait to flush ... #builtIds="<<builtIds_.size());
240  ::sleep(1);
241  }
242  // let the playback go to the last event and exit
244  while (!PlaybackRawDataProvider::instance()->areFilesClosed()) usleep(1000000);
245  usleep(100000);
246  }
247 
248  lock();
249  builtIds_.push(events_.size());
250  unlock();
251 
252  postSend();
253  while (!sentIds_.empty()) {
254  LOG4CPLUS_INFO(log_,"wait to flush ...");
255  ::sleep(1);
256  }
257  reset();
258  //postBuild();
259  /* this is not needed and should not run if reset is called
260  if (0!=PlaybackRawDataProvider::instance()&&
261  (replay_.value_&&nbEventsBuilt_>=(uint32_t)events_.size())) {
262  lock();
263  freeIds_.push(events_.size());
264  unlock();
265  postBuild();
266  }
267  */
268  LOG4CPLUS_INFO(log_,"Finished stopping!");
269  fsm_.fireEvent("StopDone",this);
270  }
271  catch (xcept::Exception &e) {
272  string msg = "stopping FAILED: " + (string)e.what();
273  fsm_.fireFailed(msg,this);
274  }
275  return false;
276 }
void unlock()
Definition: BU.h:111
std::queue< unsigned int > builtIds_
Definition: BU.h:158
StateMachine fsm_
Definition: BU.h:149
Logger log_
Definition: BU.h:137
void postBuild()
Definition: BU.h:113
void sleep(Duration_t)
Definition: Utils.h:163
void lock()
Definition: BU.h:110
void fireFailed(const std::string &errorMsg, void *originator)
static PlaybackRawDataProvider * instance()
void reset()
Definition: BU.cc:685
void fireEvent(const std::string &evtType, void *originator)
std::vector< evf::BUEvent * > events_
Definition: BU.h:155
std::queue< unsigned int > freeIds_
Definition: BU.h:157
std::set< unsigned int > sentIds_
Definition: BU.h:159
void postSend()
Definition: BU.h:115
void evf::BU::unlock ( void  )
inlineprivate

Definition at line 111 of file BU.h.

References lock_.

Referenced by building(), halting(), I2O_BU_ALLOCATE_Callback(), I2O_BU_DISCARD_Callback(), monitoring(), sending(), and stopping().

111 { sem_post(&lock_); }
sem_t lock_
Definition: BU.h:243
void evf::BU::waitBuild ( )
inlineprivate

Definition at line 112 of file BU.h.

References buildSem_.

Referenced by building().

112 { sem_wait(&buildSem_); }
sem_t buildSem_
Definition: BU.h:244
void evf::BU::waitRqst ( )
inlineprivate

Definition at line 116 of file BU.h.

References rqstSem_.

Referenced by sending().

116 { sem_wait(&rqstSem_); }
sem_t rqstSem_
Definition: BU.h:246
void evf::BU::waitSend ( )
inlineprivate

Definition at line 114 of file BU.h.

References sendSem_.

Referenced by sending().

114 { sem_wait(&sendSem_); }
sem_t sendSem_
Definition: BU.h:245
void BU::webPageRequest ( xgi::Input in,
xgi::Output out 
)
throw (xgi::exception::Exception
)

Definition at line 402 of file BU.cc.

References recoMuon::in, mergeVDriftHistosByStation::name, and dbtoconf::out.

Referenced by BU().

404 {
405  string name=in->getenv("PATH_INFO");
406  if (name.empty()) name="defaultWebPage";
407  static_cast<xgi::MethodSignature*>(gui_->getMethod(name))->invoke(in,out);
408 }
WebGUI * gui_
Definition: BU.h:152
tuple out
Definition: dbtoconf.py:99
evf::BU::XDAQ_INSTANTIATOR ( )

Member Data Documentation

toolbox::task::ActionSignature* evf::BU::asBuilding_
private

Definition at line 169 of file BU.h.

Referenced by startBuildingWorkLoop().

toolbox::task::ActionSignature* evf::BU::asMonitoring_
private

Definition at line 177 of file BU.h.

Referenced by startMonitoringWorkLoop().

toolbox::task::ActionSignature* evf::BU::asSending_
private

Definition at line 173 of file BU.h.

Referenced by startSendingWorkLoop().

xdata::Double evf::BU::average_
private

Definition at line 196 of file BU.h.

Referenced by exportParameters(), monitoring(), and reset().

xdaq::ApplicationContext* evf::BU::buAppContext_
private

Definition at line 146 of file BU.h.

Referenced by sending().

xdaq::ApplicationDescriptor* evf::BU::buAppDesc_
private

Definition at line 140 of file BU.h.

Referenced by createMsgChain(), and sending().

sem_t evf::BU::buildSem_
private

Definition at line 244 of file BU.h.

Referenced by postBuild(), reset(), and waitBuild().

std::queue<unsigned int> evf::BU::builtIds_
private

Definition at line 158 of file BU.h.

Referenced by building(), halting(), reset(), sending(), and stopping().

xdata::String evf::BU::class_
private

Definition at line 184 of file BU.h.

Referenced by BU(), and exportParameters().

xdata::Boolean evf::BU::crc_
private

Definition at line 210 of file BU.h.

Referenced by actionPerformed(), BU(), and exportParameters().

xdata::UnsignedInteger32 evf::BU::deltaN_
private

Definition at line 191 of file BU.h.

Referenced by exportParameters(), monitoring(), and reset().

xdata::UnsignedInteger32 evf::BU::deltaSumOfSizes_
private

Definition at line 193 of file BU.h.

Referenced by exportParameters(), monitoring(), and reset().

xdata::Double evf::BU::deltaSumOfSquares_
private

Definition at line 192 of file BU.h.

Referenced by exportParameters(), monitoring(), and reset().

xdata::Double evf::BU::deltaT_
private

Definition at line 190 of file BU.h.

Referenced by exportParameters(), monitoring(), and reset().

xdata::UnsignedInteger32 evf::BU::eventBufferSize_
private

Definition at line 216 of file BU.h.

Referenced by exportParameters(), and reset().

std::vector<evf::BUEvent*> evf::BU::events_
private

Definition at line 155 of file BU.h.

Referenced by building(), generateEvent(), halting(), reset(), sending(), stopping(), and ~BU().

unsigned int evf::BU::evtNumber_
private

Definition at line 160 of file BU.h.

Referenced by generateEvent().

unsigned int evf::BU::fakeLs_
private

Definition at line 224 of file BU.h.

Referenced by createMsgChain(), and reset().

xdata::UnsignedInteger32 evf::BU::fakeLsUpdateSecs_
private

Definition at line 213 of file BU.h.

Referenced by createMsgChain(), and exportParameters().

const int evf::BU::fedHeaderSize_ =sizeof(fedh_t)
staticprivate

Definition at line 253 of file BU.h.

Referenced by createMsgChain(), and generateEvent().

xdata::UnsignedInteger32 evf::BU::fedSizeMax_
private

Definition at line 218 of file BU.h.

Referenced by exportParameters(), and generateEvent().

xdata::UnsignedInteger32 evf::BU::fedSizeMean_
private

Definition at line 219 of file BU.h.

Referenced by BU(), exportParameters(), and generateEvent().

xdata::UnsignedInteger32 evf::BU::fedSizeWidth_
private

Definition at line 220 of file BU.h.

Referenced by BU(), and exportParameters().

const int evf::BU::fedTrailerSize_ =sizeof(fedt_t)
staticprivate

Definition at line 254 of file BU.h.

Referenced by createMsgChain(), and generateEvent().

xdata::UnsignedInteger32 evf::BU::firstEvent_
private

Definition at line 214 of file BU.h.

Referenced by exportParameters(), and generateEvent().

std::queue<unsigned int> evf::BU::freeIds_
private

Definition at line 157 of file BU.h.

Referenced by building(), halting(), I2O_BU_DISCARD_Callback(), reset(), and stopping().

const int evf::BU::frlHeaderSize_ =sizeof(frlh_t)
staticprivate

Definition at line 252 of file BU.h.

Referenced by createMsgChain().

StateMachine evf::BU::fsm_
private

Definition at line 149 of file BU.h.

Referenced by BU(), configuring(), enabling(), exportParameters(), halting(), and stopping().

xdaq::ApplicationDescriptor* evf::BU::fuAppDesc_
private

Definition at line 143 of file BU.h.

Referenced by createMsgChain(), I2O_BU_ALLOCATE_Callback(), and sending().

double evf::BU::gaussianMean_
private

Definition at line 227 of file BU.h.

Referenced by BU(), and generateEvent().

double evf::BU::gaussianWidth_
private

Definition at line 228 of file BU.h.

Referenced by BU(), and generateEvent().

WebGUI* evf::BU::gui_
private

Definition at line 152 of file BU.h.

Referenced by actionPerformed(), BU(), exportParameters(), monitoring(), and reset().

xdata::String evf::BU::hostname_
private

Definition at line 186 of file BU.h.

Referenced by BU(), and exportParameters().

toolbox::mem::Pool* evf::BU::i2oPool_
private

Definition at line 240 of file BU.h.

Referenced by actionPerformed(), BU(), and createMsgChain().

xdata::UnsignedInteger32 evf::BU::instance_
private

Definition at line 185 of file BU.h.

Referenced by BU(), and exportParameters().

bool evf::BU::isBuilding_
private

Definition at line 163 of file BU.h.

Referenced by building(), enabling(), halting(), and startBuildingWorkLoop().

bool evf::BU::isHalting_
private
bool evf::BU::isSending_
private

Definition at line 164 of file BU.h.

Referenced by enabling(), halting(), sending(), and startSendingWorkLoop().

timeval evf::BU::lastLsUpdate_
private

Definition at line 225 of file BU.h.

Referenced by createMsgChain().

sem_t evf::BU::lock_
private

Definition at line 243 of file BU.h.

Referenced by lock(), reset(), and unlock().

Logger evf::BU::log_
private
xdata::Double evf::BU::memUsedInMB_
private

Definition at line 188 of file BU.h.

Referenced by actionPerformed(), and exportParameters().

xdata::String evf::BU::mode_
private

Definition at line 208 of file BU.h.

Referenced by actionPerformed(), and exportParameters().

unsigned int evf::BU::monLastN_
private

Definition at line 232 of file BU.h.

Referenced by monitoring(), and reset().

unsigned int evf::BU::monLastSumOfSizes_
private

Definition at line 234 of file BU.h.

Referenced by monitoring(), and reset().

uint64_t evf::BU::monLastSumOfSquares_
private

Definition at line 233 of file BU.h.

Referenced by monitoring(), and reset().

xdata::UnsignedInteger32 evf::BU::monSleepSec_
private

Definition at line 222 of file BU.h.

Referenced by exportParameters(), and monitoring().

struct timeval evf::BU::monStartTime_
private

Definition at line 231 of file BU.h.

Referenced by monitoring(), and startMonitoringWorkLoop().

xdata::UnsignedInteger32 evf::BU::msgBufferSize_
private

Definition at line 217 of file BU.h.

Referenced by createMsgChain(), and exportParameters().

xdata::UnsignedInteger32 evf::BU::nbEventsBuilt_
private

Definition at line 203 of file BU.h.

Referenced by building(), exportParameters(), generateEvent(), halting(), and monitoring().

xdata::UnsignedInteger32 evf::BU::nbEventsDiscarded_
private

Definition at line 205 of file BU.h.

Referenced by exportParameters(), and I2O_BU_DISCARD_Callback().

xdata::UnsignedInteger32 evf::BU::nbEventsInBU_
private

Definition at line 201 of file BU.h.

Referenced by exportParameters(), I2O_BU_ALLOCATE_Callback(), and sending().

xdata::UnsignedInteger32 evf::BU::nbEventsRequested_
private

Definition at line 202 of file BU.h.

Referenced by exportParameters(), and I2O_BU_ALLOCATE_Callback().

xdata::UnsignedInteger32 evf::BU::nbEventsSent_
private

Definition at line 204 of file BU.h.

Referenced by exportParameters(), and sending().

xdata::Boolean evf::BU::overwriteEvtId_
private

Definition at line 211 of file BU.h.

Referenced by exportParameters(), and generateEvent().

xdata::Boolean evf::BU::overwriteLsId_
private

Definition at line 212 of file BU.h.

Referenced by createMsgChain(), and exportParameters().

xdata::UnsignedInteger32 evf::BU::queueSize_
private

Definition at line 215 of file BU.h.

Referenced by exportParameters(), and reset().

xdata::Double evf::BU::rate_
private

Definition at line 197 of file BU.h.

Referenced by exportParameters(), monitoring(), and reset().

xdata::Boolean evf::BU::replay_
private

Definition at line 209 of file BU.h.

Referenced by exportParameters(), generateEvent(), and halting().

xdata::Double evf::BU::rms_
private

Definition at line 198 of file BU.h.

Referenced by exportParameters(), monitoring(), and reset().

std::queue<unsigned int> evf::BU::rqstIds_
private

Definition at line 156 of file BU.h.

Referenced by I2O_BU_ALLOCATE_Callback(), reset(), and sending().

sem_t evf::BU::rqstSem_
private

Definition at line 246 of file BU.h.

Referenced by postRqst(), reset(), and waitRqst().

xdata::UnsignedInteger32 evf::BU::runNumber_
private

Definition at line 187 of file BU.h.

Referenced by exportParameters().

sem_t evf::BU::sendSem_
private

Definition at line 245 of file BU.h.

Referenced by postSend(), reset(), and waitSend().

std::set<unsigned int> evf::BU::sentIds_
private

Definition at line 159 of file BU.h.

Referenced by I2O_BU_DISCARD_Callback(), reset(), sending(), and stopping().

std::string evf::BU::sourceId_
private

Definition at line 180 of file BU.h.

Referenced by BU(), startBuildingWorkLoop(), startMonitoringWorkLoop(), and startSendingWorkLoop().

unsigned int evf::BU::sumOfSizes_
private

Definition at line 236 of file BU.h.

Referenced by monitoring(), and sending().

uint64_t evf::BU::sumOfSquares_
private

Definition at line 235 of file BU.h.

Referenced by monitoring(), and sending().

xdata::Double evf::BU::throughput_
private

Definition at line 195 of file BU.h.

Referenced by exportParameters(), monitoring(), and reset().

xdata::String evf::BU::url_
private

Definition at line 183 of file BU.h.

Referenced by BU(), and exportParameters().

xdata::Boolean evf::BU::useFixedFedSize_
private

Definition at line 221 of file BU.h.

Referenced by exportParameters(), and generateEvent().

std::vector<unsigned int> evf::BU::validFedIds_
private

Definition at line 161 of file BU.h.

Referenced by createMsgChain(), enabling(), generateEvent(), and reset().

toolbox::task::WorkLoop* evf::BU::wlBuilding_
private

Definition at line 168 of file BU.h.

Referenced by startBuildingWorkLoop().

toolbox::task::WorkLoop* evf::BU::wlMonitoring_
private

Definition at line 176 of file BU.h.

Referenced by startMonitoringWorkLoop().

toolbox::task::WorkLoop* evf::BU::wlSending_
private

Definition at line 172 of file BU.h.

Referenced by startSendingWorkLoop().