00001
00002
00003
00004
00005
00006
00007
00009
00010
00011 #include "EventFilter/AutoBU/interface/BU.h"
00012
00013 #include "FWCore/Utilities/interface/CRC16.h"
00014
00015 #include "DataFormats/FEDRawData/interface/FEDNumbering.h"
00016 #include "DataFormats/FEDRawData/interface/FEDRawDataCollection.h"
00017
00018 #include "xoap/SOAPEnvelope.h"
00019 #include "xoap/SOAPBody.h"
00020 #include "xoap/domutils.h"
00021
00022 #include <netinet/in.h>
00023 #include <sstream>
00024
00025
00026 using namespace std;
00027 using namespace evf;
00028
00029
00031
00033
00034
00035 BU::BU(xdaq::ApplicationStub *s)
00036 : xdaq::Application(s)
00037 , log_(getApplicationLogger())
00038 , buAppDesc_(getApplicationDescriptor())
00039 , fuAppDesc_(0)
00040 , buAppContext_(getApplicationContext())
00041 , fsm_(this)
00042 , gui_(0)
00043 , evtNumber_(0)
00044 , isBuilding_(false)
00045 , isSending_(false)
00046 , isHalting_(false)
00047 , wlBuilding_(0)
00048 , asBuilding_(0)
00049 , wlSending_(0)
00050 , asSending_(0)
00051 , wlMonitoring_(0)
00052 , asMonitoring_(0)
00053 , instance_(0)
00054 , runNumber_(0)
00055 , memUsedInMB_(0.0)
00056 , deltaT_(0.0)
00057 , deltaN_(0)
00058 , deltaSumOfSquares_(0)
00059 , deltaSumOfSizes_(0)
00060 , throughput_(0.0)
00061 , average_(0.0)
00062 , rate_(0.0)
00063 , rms_(0.0)
00064 , nbEventsInBU_(0)
00065 , nbEventsRequested_(0)
00066 , nbEventsBuilt_(0)
00067 , nbEventsSent_(0)
00068 , nbEventsDiscarded_(0)
00069 , mode_("RANDOM")
00070 , replay_(false)
00071 , crc_(true)
00072 , overwriteEvtId_(true)
00073 , firstEvent_(1)
00074 , queueSize_(32)
00075 , eventBufferSize_(0x400000)
00076 , msgBufferSize_(32768)
00077 , fedSizeMax_(65536)
00078 , fedSizeMean_(1024)
00079 , fedSizeWidth_(1024)
00080 , useFixedFedSize_(false)
00081 , monSleepSec_(1)
00082 , gaussianMean_(0.0)
00083 , gaussianWidth_(1.0)
00084 , monLastN_(0)
00085 , monLastSumOfSquares_(0)
00086 , monLastSumOfSizes_(0)
00087 , sumOfSquares_(0)
00088 , sumOfSizes_(0)
00089 , i2oPool_(0)
00090 {
00091
00092 fsm_.initialize<evf::BU>(this);
00093
00094
00095 url_ =
00096 getApplicationDescriptor()->getContextDescriptor()->getURL()+"/"+
00097 getApplicationDescriptor()->getURN();
00098 class_ =getApplicationDescriptor()->getClassName();
00099 instance_=getApplicationDescriptor()->getInstance();
00100 hostname_=getApplicationDescriptor()->getContextDescriptor()->getURL();
00101 sourceId_=class_.toString()+instance_.toString();
00102
00103
00104 i2o::bind(this,&BU::I2O_BU_ALLOCATE_Callback,I2O_BU_ALLOCATE,XDAQ_ORGANIZATION_ID);
00105 i2o::bind(this,&BU::I2O_BU_DISCARD_Callback, I2O_BU_DISCARD, XDAQ_ORGANIZATION_ID);
00106
00107
00108 string i2oPoolName=sourceId_+"_i2oPool";
00109 try {
00110 toolbox::mem::HeapAllocator *allocator=new toolbox::mem::HeapAllocator();
00111 toolbox::net::URN urn("toolbox-mem-pool",i2oPoolName);
00112 toolbox::mem::MemoryPoolFactory* poolFactory=
00113 toolbox::mem::getMemoryPoolFactory();
00114 i2oPool_=poolFactory->createPool(urn,allocator);
00115 }
00116 catch (toolbox::mem::exception::Exception& e) {
00117 string s="Failed to create pool: "+i2oPoolName;
00118 LOG4CPLUS_FATAL(log_,s);
00119 XCEPT_RETHROW(xcept::Exception,s,e);
00120 }
00121
00122
00123 xgi::bind(this,&evf::BU::webPageRequest,"Default");
00124 gui_=new WebGUI(this,&fsm_);
00125 gui_->setSmallAppIcon("/rubuilder/bu/images/bu32x32.gif");
00126 gui_->setLargeAppIcon("/rubuilder/bu/images/bu64x64.gif");
00127
00128 vector<toolbox::lang::Method*> methods=gui_->getMethods();
00129 vector<toolbox::lang::Method*>::iterator it;
00130 for (it=methods.begin();it!=methods.end();++it) {
00131 if ((*it)->type()=="cgi") {
00132 string name=static_cast<xgi::MethodSignature*>(*it)->name();
00133 xgi::bind(this,&evf::BU::webPageRequest,name);
00134 }
00135 }
00136 xgi::bind(this,&evf::BU::customWebPage,"customWebPage");
00137
00138
00139
00140 exportParameters();
00141
00142
00143 fsm_.findRcmsStateListener();
00144
00145
00146 gaussianMean_ =std::log((double)fedSizeMean_);
00147 gaussianWidth_=std::sqrt(std::log
00148 (0.5*
00149 (1+std::sqrt
00150 (1.0+4.0*
00151 fedSizeWidth_.value_*fedSizeWidth_.value_/
00152 fedSizeMean_.value_/fedSizeMean_.value_))));
00153
00154
00155 startMonitoringWorkLoop();
00156
00157
00158 BUEvent::setComputeCrc(crc_.value_);
00159 }
00160
00161
00162
00163 BU::~BU()
00164 {
00165 while (!events_.empty()) { delete events_.back(); events_.pop_back(); }
00166 }
00167
00168
00170
00172
00173
00174 bool BU::configuring(toolbox::task::WorkLoop* wl)
00175 {
00176 isHalting_=false;
00177 try {
00178 LOG4CPLUS_INFO(log_,"Start configuring ...");
00179 reset();
00180 LOG4CPLUS_INFO(log_,"Finished configuring!");
00181 fsm_.fireEvent("ConfigureDone",this);
00182 }
00183 catch (xcept::Exception &e) {
00184 string msg = "configuring FAILED: " + (string)e.what();
00185 fsm_.fireFailed(msg,this);
00186 }
00187
00188 return false;
00189 }
00190
00191
00192
00193 bool BU::enabling(toolbox::task::WorkLoop* wl)
00194 {
00195 isHalting_=false;
00196 try {
00197 LOG4CPLUS_INFO(log_,"Start enabling ...");
00198
00199
00200 if (0!=PlaybackRawDataProvider::instance()) {
00201 for (unsigned int i=0;i<(unsigned int)FEDNumbering::MAXFEDID+1;i++)
00202 if (FEDNumbering::inRange(i)) validFedIds_.push_back(i);
00203 }
00204 else{
00205 for (unsigned int i=0;i<(unsigned int)FEDNumbering::MAXFEDID+1;i++)
00206 if (FEDNumbering::inRangeNoGT(i)) validFedIds_.push_back(i);
00207 }
00208 if (!isBuilding_) startBuildingWorkLoop();
00209 if (!isSending_) startSendingWorkLoop();
00210 LOG4CPLUS_INFO(log_,"Finished enabling!");
00211 fsm_.fireEvent("EnableDone",this);
00212 }
00213 catch (xcept::Exception &e) {
00214 string msg = "enabling FAILED: " + (string)e.what();
00215 fsm_.fireFailed(msg,this);
00216 }
00217
00218 return false;
00219 }
00220
00221
00222
00223 bool BU::stopping(toolbox::task::WorkLoop* wl)
00224 {
00225 try {
00226 LOG4CPLUS_INFO(log_,"Start stopping :) ...");
00227
00228 if (0!=PlaybackRawDataProvider::instance()&&
00229 (!replay_.value_||nbEventsBuilt_<(uint32_t)events_.size())) {
00230 lock();
00231 freeIds_.push(events_.size());
00232 unlock();
00233 postBuild();
00234 while (!builtIds_.empty()) {
00235 LOG4CPLUS_INFO(log_,"wait to flush ... #builtIds="<<builtIds_.size());
00236 ::sleep(1);
00237 }
00238
00239 PlaybackRawDataProvider::instance()->setFreeToEof();
00240 while (!PlaybackRawDataProvider::instance()->areFilesClosed()) usleep(1000000);
00241 usleep(100000);
00242 }
00243
00244 lock();
00245 builtIds_.push(events_.size());
00246 unlock();
00247
00248 postSend();
00249 while (!sentIds_.empty()) {
00250 LOG4CPLUS_INFO(log_,"wait to flush ...");
00251 ::sleep(1);
00252 }
00253 reset();
00254
00255
00256
00257
00258
00259
00260
00261
00262
00263 LOG4CPLUS_INFO(log_,"Finished stopping!");
00264 fsm_.fireEvent("StopDone",this);
00265 }
00266 catch (xcept::Exception &e) {
00267 string msg = "stopping FAILED: " + (string)e.what();
00268 fsm_.fireFailed(msg,this);
00269 }
00270 return false;
00271 }
00272
00273
00274
00275 bool BU::halting(toolbox::task::WorkLoop* wl)
00276 {
00277 try {
00278 LOG4CPLUS_INFO(log_,"Start halting ...");
00279 isHalting_=true;
00280 if (isBuilding_&&isSending_) {
00281 lock();
00282 freeIds_.push(events_.size());
00283 builtIds_.push(events_.size());
00284 unlock();
00285 postBuild();
00286 postSend();
00287 }
00288 if (0!=PlaybackRawDataProvider::instance()&&
00289 (!replay_.value_||nbEventsBuilt_<(uint32_t)events_.size())) {
00290 PlaybackRawDataProvider::instance()->setFreeToEof();
00291 while (!PlaybackRawDataProvider::instance()->areFilesClosed()) usleep(1000000);
00292 usleep(100000);
00293 }
00294 LOG4CPLUS_INFO(log_,"Finished halting!");
00295 fsm_.fireEvent("HaltDone",this);
00296 }
00297 catch (xcept::Exception &e) {
00298 string msg = "halting FAILED: " + (string)e.what();
00299 fsm_.fireFailed(msg,this);
00300 }
00301 return false;
00302 }
00303
00304
00305
00306 xoap::MessageReference BU::fsmCallback(xoap::MessageReference msg)
00307 throw (xoap::exception::Exception)
00308 {
00309 return fsm_.commandCallback(msg);
00310 }
00311
00312
00313
00314 void BU::I2O_BU_ALLOCATE_Callback(toolbox::mem::Reference *bufRef)
00315 {
00316 if (isHalting_) {
00317 LOG4CPLUS_WARN(log_,"Ignore BU_ALLOCATE message while halting.");
00318 bufRef->release();
00319 return;
00320 }
00321
00322 I2O_MESSAGE_FRAME *stdMsg;
00323 I2O_BU_ALLOCATE_MESSAGE_FRAME *msg;
00324
00325 stdMsg=(I2O_MESSAGE_FRAME*)bufRef->getDataLocation();
00326 msg =(I2O_BU_ALLOCATE_MESSAGE_FRAME*)stdMsg;
00327
00328 if (0==fuAppDesc_) {
00329 I2O_TID fuTid=stdMsg->InitiatorAddress;
00330 fuAppDesc_=i2o::utils::getAddressMap()->getApplicationDescriptor(fuTid);
00331 }
00332
00333 for (unsigned int i=0;i<msg->n;i++) {
00334 unsigned int fuResourceId=msg->allocate[i].fuTransactionId;
00335 lock();
00336 rqstIds_.push(fuResourceId);
00337 postRqst();
00338 nbEventsRequested_++;
00339 nbEventsInBU_++;
00340 unlock();
00341 }
00342
00343 bufRef->release();
00344 }
00345
00346
00347
00348 void BU::I2O_BU_DISCARD_Callback(toolbox::mem::Reference *bufRef)
00349 {
00350 if (isHalting_) {
00351 LOG4CPLUS_WARN(log_,"Ignore BU_DISCARD message while halting.");
00352 bufRef->release();
00353 return;
00354 }
00355
00356 I2O_MESSAGE_FRAME *stdMsg=(I2O_MESSAGE_FRAME*)bufRef->getDataLocation();
00357 I2O_BU_DISCARD_MESSAGE_FRAME*msg =(I2O_BU_DISCARD_MESSAGE_FRAME*)stdMsg;
00358 unsigned int buResourceId=msg->buResourceId[0];
00359
00360 lock();
00361 int result=sentIds_.erase(buResourceId);
00362 unlock();
00363
00364 if (!result) {
00365 LOG4CPLUS_ERROR(log_,"can't discard unknown buResourceId '"<<buResourceId<<"'");
00366 }
00367 else {
00368 lock();
00369 freeIds_.push(buResourceId);
00370 nbEventsDiscarded_.value_++;
00371 unlock();
00372 postBuild();
00373 }
00374
00375 bufRef->release();
00376 }
00377
00378
00379
00380 void BU::actionPerformed(xdata::Event& e)
00381 {
00382 gui_->monInfoSpace()->lock();
00383 if (e.type()=="urn:xdata-event:ItemGroupRetrieveEvent") {
00384 mode_=(0==PlaybackRawDataProvider::instance())?"RANDOM":"PLAYBACK";
00385 if (0!=i2oPool_) memUsedInMB_=i2oPool_->getMemoryUsage().getUsed()*9.53674e-07;
00386 else memUsedInMB_=0.0;
00387 }
00388 else if (e.type()=="ItemChangedEvent") {
00389 string item=dynamic_cast<xdata::ItemChangedEvent&>(e).itemName();
00390 if (item=="crc") BUEvent::setComputeCrc(crc_.value_);
00391 }
00392 gui_->monInfoSpace()->unlock();
00393 }
00394
00395
00396
00397 void BU::webPageRequest(xgi::Input *in,xgi::Output *out)
00398 throw (xgi::exception::Exception)
00399 {
00400 string name=in->getenv("PATH_INFO");
00401 if (name.empty()) name="defaultWebPage";
00402 static_cast<xgi::MethodSignature*>(gui_->getMethod(name))->invoke(in,out);
00403 }
00404
00405
00406
00407 void BU::customWebPage(xgi::Input*in,xgi::Output*out)
00408 throw (xgi::exception::Exception)
00409 {
00410 *out<<"<html></html>"<<endl;
00411 }
00412
00413
00414
00415 void BU::startBuildingWorkLoop() throw (evf::Exception)
00416 {
00417 try {
00418 LOG4CPLUS_INFO(log_,"Start 'building' workloop");
00419 wlBuilding_=
00420 toolbox::task::getWorkLoopFactory()->getWorkLoop(sourceId_+
00421 "Building",
00422 "waiting");
00423 if (!wlBuilding_->isActive()) wlBuilding_->activate();
00424 asBuilding_=toolbox::task::bind(this,&BU::building,sourceId_+"Building");
00425 wlBuilding_->submit(asBuilding_);
00426 isBuilding_=true;
00427 }
00428 catch (xcept::Exception& e) {
00429 string msg = "Failed to start workloop 'building'.";
00430 XCEPT_RETHROW(evf::Exception,msg,e);
00431 }
00432 }
00433
00434
00435
00436 bool BU::building(toolbox::task::WorkLoop* wl)
00437 {
00438 waitBuild();
00439 lock();
00440 unsigned int buResourceId=freeIds_.front(); freeIds_.pop();
00441 unlock();
00442
00443 if (buResourceId>=(uint32_t)events_.size()) {
00444 LOG4CPLUS_INFO(log_,"shutdown 'building' workloop.");
00445 isBuilding_=false;
00446 return false;
00447 }
00448
00449 if (!isHalting_) {
00450 BUEvent* evt=events_[buResourceId];
00451 if(generateEvent(evt)) {
00452 lock();
00453 nbEventsBuilt_++;
00454 builtIds_.push(buResourceId);
00455 unlock();
00456
00457 postSend();
00458 }
00459 else {
00460 LOG4CPLUS_INFO(log_,"building:received null post");
00461 lock();
00462 unsigned int saveBUResourceId = buResourceId;
00463
00464 freeIds_.push(saveBUResourceId);
00465 unlock();
00466 isBuilding_=false;
00467 return false;
00468 }
00469 }
00470 return true;
00471 }
00472
00473
00474
00475 void BU::startSendingWorkLoop() throw (evf::Exception)
00476 {
00477 try {
00478 LOG4CPLUS_INFO(log_,"Start 'sending' workloop");
00479 wlSending_=toolbox::task::getWorkLoopFactory()->getWorkLoop(sourceId_+
00480 "Sending",
00481 "waiting");
00482 if (!wlSending_->isActive()) wlSending_->activate();
00483
00484 asSending_=toolbox::task::bind(this,&BU::sending,sourceId_+"Sending");
00485 wlSending_->submit(asSending_);
00486 isSending_=true;
00487 }
00488 catch (xcept::Exception& e) {
00489 string msg = "Failed to start workloop 'sending'.";
00490 XCEPT_RETHROW(evf::Exception,msg,e);
00491 }
00492 }
00493
00494
00495
00496 bool BU::sending(toolbox::task::WorkLoop* wl)
00497 {
00498 waitSend();
00499 lock();
00500 unsigned int buResourceId=builtIds_.front(); builtIds_.pop();
00501 unlock();
00502
00503 if (buResourceId>=(uint32_t)events_.size()) {
00504 LOG4CPLUS_INFO(log_,"shutdown 'sending' workloop.");
00505 isSending_=false;
00506 return false;
00507 }
00508
00509 if (!isHalting_) {
00510 waitRqst();
00511 lock();
00512 unsigned int fuResourceId=rqstIds_.front(); rqstIds_.pop();
00513 unlock();
00514
00515 BUEvent* evt=events_[buResourceId];
00516 toolbox::mem::Reference* msg=createMsgChain(evt,fuResourceId);
00517
00518 lock();
00519 sumOfSquares_+=(uint64_t)evt->evtSize()*(uint64_t)evt->evtSize();
00520 sumOfSizes_ +=evt->evtSize();
00521 nbEventsInBU_--;
00522 nbEventsSent_++;
00523 sentIds_.insert(buResourceId);
00524 unlock();
00525
00526 buAppContext_->postFrame(msg,buAppDesc_,fuAppDesc_);
00527 }
00528
00529 return true;
00530 }
00531
00532
00533
00534 void BU::startMonitoringWorkLoop() throw (evf::Exception)
00535 {
00536 struct timezone timezone;
00537 gettimeofday(&monStartTime_,&timezone);
00538
00539 try {
00540 LOG4CPLUS_INFO(log_,"Start 'monitoring' workloop");
00541 wlMonitoring_=
00542 toolbox::task::getWorkLoopFactory()->getWorkLoop(sourceId_+
00543 "Monitoring",
00544 "waiting");
00545 if (!wlMonitoring_->isActive()) wlMonitoring_->activate();
00546 asMonitoring_=toolbox::task::bind(this,&BU::monitoring,sourceId_+"Monitoring");
00547 wlMonitoring_->submit(asMonitoring_);
00548 }
00549 catch (xcept::Exception& e) {
00550 string msg = "Failed to start workloop 'monitoring'.";
00551 XCEPT_RETHROW(evf::Exception,msg,e);
00552 }
00553 }
00554
00555
00556
00557 bool BU::monitoring(toolbox::task::WorkLoop* wl)
00558 {
00559 struct timeval monEndTime;
00560 struct timezone timezone;
00561
00562 gettimeofday(&monEndTime,&timezone);
00563
00564 lock();
00565 unsigned int monN =nbEventsBuilt_.value_;
00566 uint64_t monSumOfSquares=sumOfSquares_;
00567 unsigned int monSumOfSizes =sumOfSizes_;
00568 uint64_t deltaSumOfSquares;
00569 unlock();
00570
00571 gui_->monInfoSpace()->lock();
00572
00573 deltaT_.value_=deltaT(&monStartTime_,&monEndTime);
00574 monStartTime_=monEndTime;
00575
00576 deltaN_.value_=monN-monLastN_;
00577 monLastN_=monN;
00578
00579 deltaSumOfSquares=monSumOfSquares-monLastSumOfSquares_;
00580 deltaSumOfSquares_.value_=(double)deltaSumOfSquares;
00581 monLastSumOfSquares_=monSumOfSquares;
00582
00583 deltaSumOfSizes_.value_=monSumOfSizes-monLastSumOfSizes_;
00584 monLastSumOfSizes_=monSumOfSizes;
00585
00586 if (deltaT_.value_!=0) {
00587 throughput_=deltaSumOfSizes_.value_/deltaT_.value_;
00588 rate_ =deltaN_.value_/deltaT_.value_;
00589 }
00590 else {
00591 throughput_=0.0;
00592 rate_ =0.0;
00593 }
00594
00595 double meanOfSquares,mean,squareOfMean,variance;
00596
00597 if(deltaN_.value_!=0) {
00598 meanOfSquares=deltaSumOfSquares_.value_/((double)(deltaN_.value_));
00599 mean=((double)(deltaSumOfSizes_.value_))/((double)(deltaN_.value_));
00600 squareOfMean=mean*mean;
00601 variance=meanOfSquares-squareOfMean;
00602 average_=deltaSumOfSizes_.value_/deltaN_.value_;
00603 rms_ =std::sqrt(variance);
00604 }
00605 else {
00606 average_=0.0;
00607 rms_ =0.0;
00608 }
00609
00610 gui_->monInfoSpace()->unlock();
00611
00612 ::sleep(monSleepSec_.value_);
00613
00614 return true;
00615 }
00616
00617
00618
00620
00622
00623
00624 void BU::exportParameters()
00625 {
00626 if (0==gui_) {
00627 LOG4CPLUS_ERROR(log_,"No GUI, can't export parameters");
00628 return;
00629 }
00630
00631 gui_->addMonitorParam("url", &url_);
00632 gui_->addMonitorParam("class", &class_);
00633 gui_->addMonitorParam("instance", &instance_);
00634 gui_->addMonitorParam("hostname", &hostname_);
00635 gui_->addMonitorParam("runNumber", &runNumber_);
00636 gui_->addMonitorParam("stateName", fsm_.stateName());
00637 gui_->addMonitorParam("memUsedInMB", &memUsedInMB_);
00638 gui_->addMonitorParam("deltaT", &deltaT_);
00639 gui_->addMonitorParam("deltaN", &deltaN_);
00640 gui_->addMonitorParam("deltaSumOfSquares", &deltaSumOfSquares_);
00641 gui_->addMonitorParam("deltaSumOfSizes", &deltaSumOfSizes_);
00642 gui_->addMonitorParam("throughput", &throughput_);
00643 gui_->addMonitorParam("average", &average_);
00644 gui_->addMonitorParam("rate", &rate_);
00645 gui_->addMonitorParam("rms", &rms_);
00646
00647 gui_->addMonitorCounter("nbEvtsInBU", &nbEventsInBU_);
00648 gui_->addMonitorCounter("nbEvtsRequested", &nbEventsRequested_);
00649 gui_->addMonitorCounter("nbEvtsBuilt", &nbEventsBuilt_);
00650 gui_->addMonitorCounter("nbEvtsSent", &nbEventsSent_);
00651 gui_->addMonitorCounter("nbEvtsDiscarded", &nbEventsDiscarded_);
00652
00653 gui_->addStandardParam("mode", &mode_);
00654 gui_->addStandardParam("replay", &replay_);
00655 gui_->addStandardParam("overwriteEvtId", &overwriteEvtId_);
00656 gui_->addStandardParam("crc", &crc_);
00657 gui_->addStandardParam("firstEvent", &firstEvent_);
00658 gui_->addStandardParam("queueSize", &queueSize_);
00659 gui_->addStandardParam("eventBufferSize", &eventBufferSize_);
00660 gui_->addStandardParam("msgBufferSize", &msgBufferSize_);
00661 gui_->addStandardParam("fedSizeMax", &fedSizeMax_);
00662 gui_->addStandardParam("fedSizeMean", &fedSizeMean_);
00663 gui_->addStandardParam("fedSizeWidth", &fedSizeWidth_);
00664 gui_->addStandardParam("useFixedFedSize", &useFixedFedSize_);
00665 gui_->addStandardParam("monSleepSec", &monSleepSec_);
00666 gui_->addStandardParam("rcmsStateListener", fsm_.rcmsStateListener());
00667 gui_->addStandardParam("foundRcmsStateListener",fsm_.foundRcmsStateListener());
00668
00669
00670 gui_->exportParameters();
00671
00672 gui_->addItemChangedListener("crc",this);
00673
00674 }
00675
00676
00677
00678 void BU::reset()
00679 {
00680 gui_->resetCounters();
00681
00682 deltaT_ =0.0;
00683 deltaN_ = 0;
00684 deltaSumOfSquares_ = 0;
00685 deltaSumOfSizes_ = 0;
00686
00687 throughput_ =0.0;
00688 average_ = 0;
00689 rate_ = 0;
00690 rms_ = 0;
00691
00692 monLastN_ = 0;
00693 monLastSumOfSquares_= 0;
00694 monLastSumOfSizes_ = 0;
00695
00696 while (events_.size()) {
00697 delete events_.back();
00698 events_.pop_back();
00699 }
00700
00701 while (!rqstIds_.empty()) rqstIds_.pop();
00702 while (!freeIds_.empty()) freeIds_.pop();
00703 while (!builtIds_.empty()) builtIds_.pop();
00704 sentIds_.clear();
00705
00706 sem_init(&lock_,0,1);
00707 sem_init(&buildSem_,0,queueSize_);
00708 sem_init(&sendSem_,0,0);
00709 sem_init(&rqstSem_,0,0);
00710
00711 for (unsigned int i=0;i<queueSize_;i++) {
00712 events_.push_back(new BUEvent(i,eventBufferSize_));
00713 freeIds_.push(i);
00714 }
00715 validFedIds_.clear();
00716 }
00717
00718
00719 double BU::deltaT(const struct timeval *start,const struct timeval *end)
00720 {
00721 unsigned int sec;
00722 unsigned int usec;
00723
00724 sec = end->tv_sec - start->tv_sec;
00725
00726 if(end->tv_usec > start->tv_usec) {
00727 usec = end->tv_usec - start->tv_usec;
00728 }
00729 else {
00730 sec--;
00731 usec = 1000000 - ((unsigned int )(start->tv_usec - end->tv_usec));
00732 }
00733
00734 return ((double)sec) + ((double)usec) / 1000000.0;
00735 }
00736
00737
00738
00739 bool BU::generateEvent(BUEvent* evt)
00740 {
00741
00742 if (replay_.value_&&nbEventsBuilt_>=(uint32_t)events_.size())
00743 {
00744 if (0!=PlaybackRawDataProvider::instance())
00745 PlaybackRawDataProvider::instance()->setFreeToEof();
00746 return true;
00747 }
00748
00749 if (0!=PlaybackRawDataProvider::instance()) {
00750
00751 unsigned int runNumber,evtNumber;
00752
00753 FEDRawDataCollection* event=
00754 PlaybackRawDataProvider::instance()->getFEDRawData(runNumber,evtNumber);
00755 if(event == 0) return false;
00756 evt->initialize(evtNumber);
00757
00758 for (unsigned int i=0;i<validFedIds_.size();i++) {
00759 unsigned int fedId =validFedIds_[i];
00760 unsigned int fedSize=event->FEDData(fedId).size();
00761 unsigned char* fedAddr=event->FEDData(fedId).data();
00762 if (overwriteEvtId_.value_ && fedAddr != 0) {
00763 fedh_t *fedHeader=(fedh_t*)fedAddr;
00764 fedHeader->eventid=(fedHeader->eventid&0xFF000000)+(evtNumber&0x00FFFFFF);
00765 }
00766 if (fedSize>0) evt->writeFed(fedId,fedAddr,fedSize);
00767 }
00768 delete event;
00769 }
00770
00771 else {
00772 unsigned int evtNumber=(firstEvent_+evtNumber_++)%0x1000000;
00773 evt->initialize(evtNumber);
00774 unsigned int fedSizeMin=fedHeaderSize_+fedTrailerSize_;
00775 for (unsigned int i=0;i<validFedIds_.size();i++) {
00776 unsigned int fedId(validFedIds_[i]);
00777 unsigned int fedSize(fedSizeMean_);
00778 if (!useFixedFedSize_) {
00779 double logFedSize=CLHEP::RandGauss::shoot(gaussianMean_,gaussianWidth_);
00780 fedSize=(unsigned int)(std::exp(logFedSize));
00781 if (fedSize<fedSizeMin) fedSize=fedSizeMin;
00782 if (fedSize>fedSizeMax_) fedSize=fedSizeMax_;
00783 fedSize-=fedSize%8;
00784 }
00785
00786 evt->writeFed(fedId,0,fedSize);
00787 evt->writeFedHeader(i);
00788 evt->writeFedTrailer(i);
00789 }
00790
00791 }
00792 return true;
00793 }
00794
00795
00796
00797 toolbox::mem::Reference *BU::createMsgChain(BUEvent* evt,
00798 unsigned int fuResourceId)
00799 {
00800 unsigned int msgHeaderSize =sizeof(I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME);
00801 unsigned int msgPayloadSize=msgBufferSize_-msgHeaderSize;
00802
00803 if((msgPayloadSize%4)!=0) LOG4CPLUS_ERROR(log_,"Invalid Payload Size.");
00804
00805 toolbox::mem::Reference *head =0;
00806 toolbox::mem::Reference *tail =0;
00807 toolbox::mem::Reference *bufRef=0;
00808
00809 I2O_MESSAGE_FRAME *stdMsg=0;
00810 I2O_PRIVATE_MESSAGE_FRAME *pvtMsg=0;
00811 I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME *block =0;
00812
00813 unsigned int iFed =0;
00814 unsigned int nSuperFrag =64;
00815 unsigned int nFedPerSuperFrag=validFedIds_.size()/nSuperFrag;
00816 unsigned int nBigSuperFrags =validFedIds_.size()%nSuperFrag;
00817
00818 if (evt->nFed()<nSuperFrag) {
00819 nSuperFrag=evt->nFed();
00820 nFedPerSuperFrag=1;
00821 nBigSuperFrags=0;
00822 }
00823 else
00824 {
00825 nFedPerSuperFrag=evt->nFed()/nSuperFrag;
00826 nBigSuperFrags =evt->nFed()%nSuperFrag;
00827 }
00828
00829 for (unsigned int iSuperFrag=0;iSuperFrag<nSuperFrag;iSuperFrag++) {
00830
00831
00832 unsigned int nFed=iFed+nFedPerSuperFrag;
00833 if (iSuperFrag<nBigSuperFrags) ++nFed;
00834
00835
00836 unsigned int nBlock =0;
00837 unsigned int curbSize=frlHeaderSize_;
00838 unsigned int totSize =curbSize;
00839 for (unsigned int i=iFed;i<nFed;i++) {
00840 curbSize+=evt->fedSize(i);
00841 totSize+=evt->fedSize(i);
00842 if (curbSize>msgPayloadSize) {
00843 curbSize+=frlHeaderSize_*(curbSize/msgPayloadSize);
00844 if(curbSize%msgPayloadSize)totSize+=frlHeaderSize_*(curbSize/msgPayloadSize);
00845 else totSize+=frlHeaderSize_*((curbSize/msgPayloadSize)-1);
00846 curbSize=curbSize%msgPayloadSize;
00847 }
00848 }
00849 nBlock=totSize/msgPayloadSize+(totSize%msgPayloadSize>0 ? 1 : 0);
00850
00851
00852
00853 unsigned int remainder =0;
00854 bool fedTrailerLeft=false;
00855 bool last =false;
00856 bool warning =false;
00857 unsigned char *startOfPayload=0;
00858 U32 payload(0);
00859
00860 for(unsigned int iBlock=0;iBlock<nBlock;iBlock++) {
00861
00862
00863 payload=msgPayloadSize;
00864
00865
00866 try {
00867 bufRef=toolbox::mem::getMemoryPoolFactory()->getFrame(i2oPool_,
00868 msgBufferSize_);
00869 }
00870 catch(xcept::Exception &e) {
00871 LOG4CPLUS_FATAL(log_,"xdaq::frameAlloc failed");
00872 }
00873
00874
00875 stdMsg=(I2O_MESSAGE_FRAME*)bufRef->getDataLocation();
00876 pvtMsg=(I2O_PRIVATE_MESSAGE_FRAME*)stdMsg;
00877 block =(I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME*)stdMsg;
00878
00879 pvtMsg->XFunctionCode =I2O_FU_TAKE;
00880 pvtMsg->OrganizationID =XDAQ_ORGANIZATION_ID;
00881
00882 stdMsg->MessageSize =(msgHeaderSize + payload) >> 2;
00883 stdMsg->Function =I2O_PRIVATE_MESSAGE;
00884 stdMsg->VersionOffset =0;
00885 stdMsg->MsgFlags =0;
00886 stdMsg->InitiatorAddress=i2o::utils::getAddressMap()->getTid(buAppDesc_);
00887 stdMsg->TargetAddress =i2o::utils::getAddressMap()->getTid(fuAppDesc_);
00888
00889 block->buResourceId =evt->buResourceId();
00890 block->fuTransactionId =fuResourceId;
00891 block->blockNb =iBlock;
00892 block->nbBlocksInSuperFragment=nBlock;
00893 block->superFragmentNb =iSuperFrag;
00894 block->nbSuperFragmentsInEvent=nSuperFrag;
00895 block->eventNumber =evt->evtNumber();
00896
00897
00898 startOfPayload =(unsigned char*)block+msgHeaderSize;
00899 frlh_t* frlHeader=(frlh_t*)startOfPayload;
00900 frlHeader->trigno=evt->evtNumber();
00901 frlHeader->segno =iBlock;
00902
00903 unsigned char *startOfFedBlocks=startOfPayload+frlHeaderSize_;
00904 payload -=frlHeaderSize_;
00905 frlHeader->segsize =payload;
00906 unsigned int leftspace=payload;
00907
00908
00909 if(fedTrailerLeft) {
00910 memcpy(startOfFedBlocks,
00911 evt->fedAddr(iFed)+evt->fedSize(iFed)-fedTrailerSize_,
00912 fedTrailerSize_);
00913
00914 startOfFedBlocks+=fedTrailerSize_;
00915 leftspace -=fedTrailerSize_;
00916 remainder =0;
00917 fedTrailerLeft =false;
00918
00919
00920 if((iFed==nFed-1) && !last) {
00921 frlHeader->segsize-=leftspace;
00922 int msgSize=stdMsg->MessageSize << 2;
00923 msgSize -=leftspace;
00924 bufRef->setDataSize(msgSize);
00925 stdMsg->MessageSize = msgSize >> 2;
00926 frlHeader->segsize=frlHeader->segsize | FRL_LAST_SEGM;
00927 last=true;
00928 }
00929
00930
00931 iFed++;
00932 }
00933
00937 if (remainder>0) {
00938
00939
00940 if(payload>=remainder) {
00941 memcpy(startOfFedBlocks,
00942 evt->fedAddr(iFed)+evt->fedSize(iFed)-remainder,
00943 remainder);
00944
00945 startOfFedBlocks+=remainder;
00946 leftspace -=remainder;
00947
00948
00949 if(iFed==nFed-1) {
00950 frlHeader->segsize-=leftspace;
00951 int msgSize=stdMsg->MessageSize << 2;
00952 msgSize -=leftspace;
00953 bufRef->setDataSize(msgSize);
00954 stdMsg->MessageSize = msgSize >> 2;
00955 frlHeader->segsize=frlHeader->segsize | FRL_LAST_SEGM;
00956 last=true;
00957 }
00958
00959
00960 iFed++;
00961
00962
00963 remainder=0;
00964 }
00965
00966 else if (payload>=(remainder-fedTrailerSize_)) {
00967 memcpy(startOfFedBlocks,
00968 evt->fedAddr(iFed)+evt->fedSize(iFed)-remainder,
00969 remainder-fedTrailerSize_);
00970
00971 frlHeader->segsize=remainder-fedTrailerSize_;
00972 fedTrailerLeft =true;
00973 leftspace -=(remainder-fedTrailerSize_);
00974 remainder =fedTrailerSize_;
00975 }
00976
00977 else {
00978 memcpy(startOfFedBlocks,
00979 evt->fedAddr(iFed)+evt->fedSize(iFed)-remainder,payload);
00980 remainder-=payload;
00981 leftspace =0;
00982 }
00983 }
00984
00988 if(remainder==0) {
00989
00990
00991 while(iFed<nFed) {
00992
00993
00994 if((int)leftspace<fedHeaderSize_) {
00995 frlHeader->segsize-=leftspace;
00996 break;
00997 }
00998
00999 memcpy(startOfFedBlocks,evt->fedAddr(iFed),fedHeaderSize_);
01000
01001 leftspace -=fedHeaderSize_;
01002 startOfFedBlocks+=fedHeaderSize_;
01003
01004
01005 if(evt->fedSize(iFed)-fedHeaderSize_<=leftspace) {
01006 memcpy(startOfFedBlocks,
01007 evt->fedAddr(iFed)+fedHeaderSize_,
01008 evt->fedSize(iFed)-fedHeaderSize_);
01009
01010 leftspace -=(evt->fedSize(iFed)-fedHeaderSize_);
01011 startOfFedBlocks+=(evt->fedSize(iFed)-fedHeaderSize_);
01012 }
01013
01014 else if(evt->fedSize(iFed)-fedHeaderSize_-fedTrailerSize_<=leftspace) {
01015 memcpy(startOfFedBlocks,
01016 evt->fedAddr(iFed)+fedHeaderSize_,
01017 evt->fedSize(iFed)-fedHeaderSize_-fedTrailerSize_);
01018
01019 leftspace -=(evt->fedSize(iFed)-fedHeaderSize_-fedTrailerSize_);
01020 frlHeader->segsize-=leftspace;
01021 fedTrailerLeft =true;
01022 remainder =fedTrailerSize_;
01023
01024 break;
01025 }
01026
01027 else {
01028 memcpy(startOfFedBlocks,evt->fedAddr(iFed)+fedHeaderSize_,leftspace);
01029 remainder=evt->fedSize(iFed)-fedHeaderSize_-leftspace;
01030 leftspace=0;
01031
01032 break;
01033 }
01034
01035
01036 iFed++;
01037
01038 }
01039
01040
01041 if (iFed==nFed && remainder==0 && !last) {
01042 frlHeader->segsize-=leftspace;
01043 int msgSize=stdMsg->MessageSize << 2;
01044 msgSize -=leftspace;
01045 bufRef->setDataSize(msgSize);
01046 stdMsg->MessageSize=msgSize >> 2;
01047 frlHeader->segsize =frlHeader->segsize | FRL_LAST_SEGM;
01048 last=true;
01049 }
01050
01051 }
01052
01053 if(iSuperFrag==0&&iBlock==0) {
01054 head=bufRef;
01055 tail=bufRef;
01056 }
01057 else {
01058 tail->setNextReference(bufRef);
01059 tail=bufRef;
01060 }
01061
01062 if((iBlock==nBlock-1) && remainder!=0) {
01063 nBlock++;
01064 warning=true;
01065 }
01066
01067 }
01068
01069
01070 if(warning) {
01071 toolbox::mem::Reference* next=head;
01072 do {
01073 block =(I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME*)next->getDataLocation();
01074 if (block->superFragmentNb==iSuperFrag)
01075 block->nbBlocksInSuperFragment=nBlock;
01076 } while((next=next->getNextReference()));
01077 }
01078
01079 }
01080
01081 return head;
01082 }
01083
01084
01085 void BU::dumpFrame(unsigned char* data,unsigned int len)
01086 {
01087 char left1[20];
01088 char left2[20];
01089 char right1[20];
01090 char right2[20];
01091
01092 printf("Byte 0 1 2 3 4 5 6 7\n");
01093
01094 int c(0);
01095 int pos(0);
01096
01097 for (unsigned int i=0;i<(len/8);i++) {
01098 int rpos(0);
01099 int off(3);
01100 for (pos=0;pos<12;pos+=3) {
01101 sprintf(&left1[pos],"%2.2x ",
01102 ((unsigned char*)data)[c+off]);
01103 sprintf(&right1[rpos],"%1c",
01104 ((data[c+off] > 32)&&(data[c+off] < 127)) ? data[c+off] : '.');
01105 sprintf (&left2[pos],"%2.2x ",
01106 ((unsigned char*)data)[c+off+4]);
01107 sprintf (&right2[rpos],"%1c",
01108 ((data[c+off+4] > 32)&&(data[c+off+4]<127)) ? data[c+off+4] : '.');
01109 rpos++;
01110 off--;
01111 }
01112 c+=8;
01113
01114 printf ("%4d: %s%s || %s%s %p\n",
01115 c-8, left1, left2, right1, right2, &data[c-8]);
01116 }
01117
01118 fflush(stdout);
01119 }
01120
01121
01123
01125
01126 XDAQ_INSTANTIATOR_IMPL(BU)