00001
00002
00003
00004
00005
00006
00007
00009
00010
00011 #include "EventFilter/AutoBU/interface/BU.h"
00012
00013 #include "FWCore/Utilities/interface/CRC16.h"
00014
00015 #include "DataFormats/FEDRawData/interface/FEDNumbering.h"
00016 #include "DataFormats/FEDRawData/interface/FEDRawDataCollection.h"
00017 #include "EventFilter/FEDInterface/interface/GlobalEventNumber.icc"
00018
00019 #include "xoap/SOAPEnvelope.h"
00020 #include "xoap/SOAPBody.h"
00021 #include "xoap/domutils.h"
00022
00023 #include <netinet/in.h>
00024 #include <sstream>
00025
00026
00027 using namespace std;
00028 using namespace evf;
00029
00030
00032
00034
00035
00036 BU::BU(xdaq::ApplicationStub *s)
00037 : xdaq::Application(s)
00038 , log_(getApplicationLogger())
00039 , buAppDesc_(getApplicationDescriptor())
00040 , fuAppDesc_(0)
00041 , buAppContext_(getApplicationContext())
00042 , fsm_(this)
00043 , gui_(0)
00044 , evtNumber_(0)
00045 , isBuilding_(false)
00046 , isSending_(false)
00047 , isHalting_(false)
00048 , wlBuilding_(0)
00049 , asBuilding_(0)
00050 , wlSending_(0)
00051 , asSending_(0)
00052 , wlMonitoring_(0)
00053 , asMonitoring_(0)
00054 , instance_(0)
00055 , runNumber_(0)
00056 , memUsedInMB_(0.0)
00057 , deltaT_(0.0)
00058 , deltaN_(0)
00059 , deltaSumOfSquares_(0)
00060 , deltaSumOfSizes_(0)
00061 , throughput_(0.0)
00062 , average_(0.0)
00063 , rate_(0.0)
00064 , rms_(0.0)
00065 , nbEventsInBU_(0)
00066 , nbEventsRequested_(0)
00067 , nbEventsBuilt_(0)
00068 , nbEventsSent_(0)
00069 , nbEventsDiscarded_(0)
00070 , mode_("RANDOM")
00071 , replay_(false)
00072 , crc_(true)
00073 , overwriteEvtId_(true)
00074 , overwriteLsId_(false)
00075 , fakeLsUpdateSecs_(23)
00076 , firstEvent_(1)
00077 , queueSize_(32)
00078 , eventBufferSize_(0x400000)
00079 , msgBufferSize_(32768)
00080 , fedSizeMax_(65536)
00081 , fedSizeMean_(1024)
00082 , fedSizeWidth_(1024)
00083 , useFixedFedSize_(false)
00084 , monSleepSec_(1)
00085 , fakeLs_(0)
00086 , gaussianMean_(0.0)
00087 , gaussianWidth_(1.0)
00088 , monLastN_(0)
00089 , monLastSumOfSquares_(0)
00090 , monLastSumOfSizes_(0)
00091 , sumOfSquares_(0)
00092 , sumOfSizes_(0)
00093 , i2oPool_(0)
00094 {
00095
00096 fsm_.initialize<evf::BU>(this);
00097
00098
00099 url_ =
00100 getApplicationDescriptor()->getContextDescriptor()->getURL()+"/"+
00101 getApplicationDescriptor()->getURN();
00102 class_ =getApplicationDescriptor()->getClassName();
00103 instance_=getApplicationDescriptor()->getInstance();
00104 hostname_=getApplicationDescriptor()->getContextDescriptor()->getURL();
00105 sourceId_=class_.toString()+instance_.toString();
00106
00107
00108 i2o::bind(this,&BU::I2O_BU_ALLOCATE_Callback,I2O_BU_ALLOCATE,XDAQ_ORGANIZATION_ID);
00109 i2o::bind(this,&BU::I2O_BU_DISCARD_Callback, I2O_BU_DISCARD, XDAQ_ORGANIZATION_ID);
00110
00111
00112 string i2oPoolName=sourceId_+"_i2oPool";
00113 try {
00114 toolbox::mem::HeapAllocator *allocator=new toolbox::mem::HeapAllocator();
00115 toolbox::net::URN urn("toolbox-mem-pool",i2oPoolName);
00116 toolbox::mem::MemoryPoolFactory* poolFactory=
00117 toolbox::mem::getMemoryPoolFactory();
00118 i2oPool_=poolFactory->createPool(urn,allocator);
00119 }
00120 catch (toolbox::mem::exception::Exception& e) {
00121 string s="Failed to create pool: "+i2oPoolName;
00122 LOG4CPLUS_FATAL(log_,s);
00123 XCEPT_RETHROW(xcept::Exception,s,e);
00124 }
00125
00126
00127 xgi::bind(this,&evf::BU::webPageRequest,"Default");
00128 gui_=new WebGUI(this,&fsm_);
00129 gui_->setSmallAppIcon("/rubuilder/bu/images/bu32x32.gif");
00130 gui_->setLargeAppIcon("/rubuilder/bu/images/bu64x64.gif");
00131
00132 vector<toolbox::lang::Method*> methods=gui_->getMethods();
00133 vector<toolbox::lang::Method*>::iterator it;
00134 for (it=methods.begin();it!=methods.end();++it) {
00135 if ((*it)->type()=="cgi") {
00136 string name=static_cast<xgi::MethodSignature*>(*it)->name();
00137 xgi::bind(this,&evf::BU::webPageRequest,name);
00138 }
00139 }
00140 xgi::bind(this,&evf::BU::customWebPage,"customWebPage");
00141
00142
00143
00144 exportParameters();
00145
00146
00147 fsm_.findRcmsStateListener();
00148
00149
00150 gaussianMean_ =std::log((double)fedSizeMean_);
00151 gaussianWidth_=std::sqrt(std::log
00152 (0.5*
00153 (1+std::sqrt
00154 (1.0+4.0*
00155 fedSizeWidth_.value_*fedSizeWidth_.value_/
00156 fedSizeMean_.value_/fedSizeMean_.value_))));
00157
00158
00159 startMonitoringWorkLoop();
00160
00161
00162 BUEvent::setComputeCrc(crc_.value_);
00163 }
00164
00165
00166
00167 BU::~BU()
00168 {
00169 while (!events_.empty()) { delete events_.back(); events_.pop_back(); }
00170 }
00171
00172
00174
00176
00177
00178 bool BU::configuring(toolbox::task::WorkLoop* wl)
00179 {
00180 isHalting_=false;
00181 try {
00182 LOG4CPLUS_INFO(log_,"Start configuring ...");
00183 reset();
00184 LOG4CPLUS_INFO(log_,"Finished configuring!");
00185 fsm_.fireEvent("ConfigureDone",this);
00186 }
00187 catch (xcept::Exception &e) {
00188 string msg = "configuring FAILED: " + (string)e.what();
00189 fsm_.fireFailed(msg,this);
00190 }
00191
00192 return false;
00193 }
00194
00195
00196
00197 bool BU::enabling(toolbox::task::WorkLoop* wl)
00198 {
00199 isHalting_=false;
00200 try {
00201 LOG4CPLUS_INFO(log_,"Start enabling ...");
00202
00203
00204 if (0!=PlaybackRawDataProvider::instance()) {
00205 for (unsigned int i=0;i<(unsigned int)FEDNumbering::MAXFEDID+1;i++)
00206 if (FEDNumbering::inRange(i)) validFedIds_.push_back(i);
00207 }
00208 else{
00209 for (unsigned int i=0;i<(unsigned int)FEDNumbering::MAXFEDID+1;i++)
00210 if (FEDNumbering::inRangeNoGT(i)) validFedIds_.push_back(i);
00211 }
00212 if (!isBuilding_) startBuildingWorkLoop();
00213 if (!isSending_) startSendingWorkLoop();
00214 LOG4CPLUS_INFO(log_,"Finished enabling!");
00215 fsm_.fireEvent("EnableDone",this);
00216 }
00217 catch (xcept::Exception &e) {
00218 string msg = "enabling FAILED: " + (string)e.what();
00219 fsm_.fireFailed(msg,this);
00220 }
00221
00222 return false;
00223 }
00224
00225
00226
00227 bool BU::stopping(toolbox::task::WorkLoop* wl)
00228 {
00229 try {
00230 LOG4CPLUS_INFO(log_,"Start stopping :) ...");
00231
00232 if (0!=PlaybackRawDataProvider::instance()) {
00233
00234 lock();
00235 freeIds_.push(events_.size());
00236 unlock();
00237 postBuild();
00238 while (!builtIds_.empty()) {
00239 LOG4CPLUS_INFO(log_,"wait to flush ... #builtIds="<<builtIds_.size());
00240 ::sleep(1);
00241 }
00242
00243 PlaybackRawDataProvider::instance()->setFreeToEof();
00244 while (!PlaybackRawDataProvider::instance()->areFilesClosed()) usleep(1000000);
00245 usleep(100000);
00246 }
00247
00248 lock();
00249 builtIds_.push(events_.size());
00250 unlock();
00251
00252 postSend();
00253 while (!sentIds_.empty()) {
00254 LOG4CPLUS_INFO(log_,"wait to flush ...");
00255 ::sleep(1);
00256 }
00257 reset();
00258
00259
00260
00261
00262
00263
00264
00265
00266
00267
00268 LOG4CPLUS_INFO(log_,"Finished stopping!");
00269 fsm_.fireEvent("StopDone",this);
00270 }
00271 catch (xcept::Exception &e) {
00272 string msg = "stopping FAILED: " + (string)e.what();
00273 fsm_.fireFailed(msg,this);
00274 }
00275 return false;
00276 }
00277
00278
00279
00280 bool BU::halting(toolbox::task::WorkLoop* wl)
00281 {
00282 try {
00283 LOG4CPLUS_INFO(log_,"Start halting ...");
00284 isHalting_=true;
00285 if (isBuilding_&&isSending_) {
00286 lock();
00287 freeIds_.push(events_.size());
00288 builtIds_.push(events_.size());
00289 unlock();
00290 postBuild();
00291 postSend();
00292 }
00293 if (0!=PlaybackRawDataProvider::instance()&&
00294 (!replay_.value_||nbEventsBuilt_<(uint32_t)events_.size())) {
00295 PlaybackRawDataProvider::instance()->setFreeToEof();
00296 while (!PlaybackRawDataProvider::instance()->areFilesClosed()) usleep(1000000);
00297 usleep(100000);
00298 }
00299 LOG4CPLUS_INFO(log_,"Finished halting!");
00300 fsm_.fireEvent("HaltDone",this);
00301 }
00302 catch (xcept::Exception &e) {
00303 string msg = "halting FAILED: " + (string)e.what();
00304 fsm_.fireFailed(msg,this);
00305 }
00306 return false;
00307 }
00308
00309
00310
00311 xoap::MessageReference BU::fsmCallback(xoap::MessageReference msg)
00312 throw (xoap::exception::Exception)
00313 {
00314 return fsm_.commandCallback(msg);
00315 }
00316
00317
00318
00319 void BU::I2O_BU_ALLOCATE_Callback(toolbox::mem::Reference *bufRef)
00320 {
00321 if (isHalting_) {
00322 LOG4CPLUS_WARN(log_,"Ignore BU_ALLOCATE message while halting.");
00323 bufRef->release();
00324 return;
00325 }
00326
00327 I2O_MESSAGE_FRAME *stdMsg;
00328 I2O_BU_ALLOCATE_MESSAGE_FRAME *msg;
00329
00330 stdMsg=(I2O_MESSAGE_FRAME*)bufRef->getDataLocation();
00331 msg =(I2O_BU_ALLOCATE_MESSAGE_FRAME*)stdMsg;
00332
00333 if (0==fuAppDesc_) {
00334 I2O_TID fuTid=stdMsg->InitiatorAddress;
00335 fuAppDesc_=i2o::utils::getAddressMap()->getApplicationDescriptor(fuTid);
00336 }
00337
00338 for (unsigned int i=0;i<msg->n;i++) {
00339 unsigned int fuResourceId=msg->allocate[i].fuTransactionId;
00340 lock();
00341 rqstIds_.push(fuResourceId);
00342 postRqst();
00343 nbEventsRequested_++;
00344 nbEventsInBU_++;
00345 unlock();
00346 }
00347
00348 bufRef->release();
00349 }
00350
00351
00352
00353 void BU::I2O_BU_DISCARD_Callback(toolbox::mem::Reference *bufRef)
00354 {
00355 if (isHalting_) {
00356 LOG4CPLUS_WARN(log_,"Ignore BU_DISCARD message while halting.");
00357 bufRef->release();
00358 return;
00359 }
00360
00361 I2O_MESSAGE_FRAME *stdMsg=(I2O_MESSAGE_FRAME*)bufRef->getDataLocation();
00362 I2O_BU_DISCARD_MESSAGE_FRAME*msg =(I2O_BU_DISCARD_MESSAGE_FRAME*)stdMsg;
00363 unsigned int buResourceId=msg->buResourceId[0];
00364
00365 lock();
00366 int result=sentIds_.erase(buResourceId);
00367 unlock();
00368
00369 if (!result) {
00370 LOG4CPLUS_ERROR(log_,"can't discard unknown buResourceId '"<<buResourceId<<"'");
00371 }
00372 else {
00373 lock();
00374 freeIds_.push(buResourceId);
00375 nbEventsDiscarded_.value_++;
00376 unlock();
00377 postBuild();
00378 }
00379
00380 bufRef->release();
00381 }
00382
00383
00384
00385 void BU::actionPerformed(xdata::Event& e)
00386 {
00387 gui_->monInfoSpace()->lock();
00388 if (e.type()=="urn:xdata-event:ItemGroupRetrieveEvent") {
00389 mode_=(0==PlaybackRawDataProvider::instance())?"RANDOM":"PLAYBACK";
00390 if (0!=i2oPool_) memUsedInMB_=i2oPool_->getMemoryUsage().getUsed()*9.53674e-07;
00391 else memUsedInMB_=0.0;
00392 }
00393 else if (e.type()=="ItemChangedEvent") {
00394 string item=dynamic_cast<xdata::ItemChangedEvent&>(e).itemName();
00395 if (item=="crc") BUEvent::setComputeCrc(crc_.value_);
00396 }
00397 gui_->monInfoSpace()->unlock();
00398 }
00399
00400
00401
00402 void BU::webPageRequest(xgi::Input *in,xgi::Output *out)
00403 throw (xgi::exception::Exception)
00404 {
00405 string name=in->getenv("PATH_INFO");
00406 if (name.empty()) name="defaultWebPage";
00407 static_cast<xgi::MethodSignature*>(gui_->getMethod(name))->invoke(in,out);
00408 }
00409
00410
00411
00412 void BU::customWebPage(xgi::Input*in,xgi::Output*out)
00413 throw (xgi::exception::Exception)
00414 {
00415 *out<<"<html></html>"<<endl;
00416 }
00417
00418
00419
00420 void BU::startBuildingWorkLoop() throw (evf::Exception)
00421 {
00422 try {
00423 LOG4CPLUS_INFO(log_,"Start 'building' workloop");
00424 wlBuilding_=
00425 toolbox::task::getWorkLoopFactory()->getWorkLoop(sourceId_+
00426 "Building",
00427 "waiting");
00428 if (!wlBuilding_->isActive()) wlBuilding_->activate();
00429 asBuilding_=toolbox::task::bind(this,&BU::building,sourceId_+"Building");
00430 wlBuilding_->submit(asBuilding_);
00431 isBuilding_=true;
00432 }
00433 catch (xcept::Exception& e) {
00434 string msg = "Failed to start workloop 'building'.";
00435 XCEPT_RETHROW(evf::Exception,msg,e);
00436 }
00437 }
00438
00439
00440
00441 bool BU::building(toolbox::task::WorkLoop* wl)
00442 {
00443 waitBuild();
00444 lock();
00445 unsigned int buResourceId=freeIds_.front(); freeIds_.pop();
00446 unlock();
00447
00448 if (buResourceId>=(uint32_t)events_.size()) {
00449 LOG4CPLUS_INFO(log_,"shutdown 'building' workloop.");
00450 isBuilding_=false;
00451 return false;
00452 }
00453
00454 if (!isHalting_) {
00455 BUEvent* evt=events_[buResourceId];
00456 if(generateEvent(evt)) {
00457 lock();
00458 nbEventsBuilt_++;
00459 builtIds_.push(buResourceId);
00460 unlock();
00461
00462 postSend();
00463 }
00464 else {
00465 LOG4CPLUS_INFO(log_,"building:received null post");
00466 lock();
00467 unsigned int saveBUResourceId = buResourceId;
00468
00469 freeIds_.push(saveBUResourceId);
00470 unlock();
00471 isBuilding_=false;
00472 return false;
00473 }
00474 }
00475 return true;
00476 }
00477
00478
00479
00480 void BU::startSendingWorkLoop() throw (evf::Exception)
00481 {
00482 try {
00483 LOG4CPLUS_INFO(log_,"Start 'sending' workloop");
00484 wlSending_=toolbox::task::getWorkLoopFactory()->getWorkLoop(sourceId_+
00485 "Sending",
00486 "waiting");
00487 if (!wlSending_->isActive()) wlSending_->activate();
00488
00489 asSending_=toolbox::task::bind(this,&BU::sending,sourceId_+"Sending");
00490 wlSending_->submit(asSending_);
00491 isSending_=true;
00492 }
00493 catch (xcept::Exception& e) {
00494 string msg = "Failed to start workloop 'sending'.";
00495 XCEPT_RETHROW(evf::Exception,msg,e);
00496 }
00497 }
00498
00499
00500
00501 bool BU::sending(toolbox::task::WorkLoop* wl)
00502 {
00503 waitSend();
00504 lock();
00505 unsigned int buResourceId=builtIds_.front(); builtIds_.pop();
00506 unlock();
00507
00508 if (buResourceId>=(uint32_t)events_.size()) {
00509 LOG4CPLUS_INFO(log_,"shutdown 'sending' workloop.");
00510 isSending_=false;
00511 return false;
00512 }
00513
00514 if (!isHalting_) {
00515 waitRqst();
00516 lock();
00517 unsigned int fuResourceId=rqstIds_.front(); rqstIds_.pop();
00518 unlock();
00519
00520 BUEvent* evt=events_[buResourceId];
00521 toolbox::mem::Reference* msg=createMsgChain(evt,fuResourceId);
00522
00523 lock();
00524 sumOfSquares_+=(uint64_t)evt->evtSize()*(uint64_t)evt->evtSize();
00525 sumOfSizes_ +=evt->evtSize();
00526 nbEventsInBU_--;
00527 nbEventsSent_++;
00528 sentIds_.insert(buResourceId);
00529 unlock();
00530
00531 buAppContext_->postFrame(msg,buAppDesc_,fuAppDesc_);
00532 }
00533
00534 return true;
00535 }
00536
00537
00538
00539 void BU::startMonitoringWorkLoop() throw (evf::Exception)
00540 {
00541 struct timezone timezone;
00542 gettimeofday(&monStartTime_,&timezone);
00543
00544 try {
00545 LOG4CPLUS_INFO(log_,"Start 'monitoring' workloop");
00546 wlMonitoring_=
00547 toolbox::task::getWorkLoopFactory()->getWorkLoop(sourceId_+
00548 "Monitoring",
00549 "waiting");
00550 if (!wlMonitoring_->isActive()) wlMonitoring_->activate();
00551 asMonitoring_=toolbox::task::bind(this,&BU::monitoring,sourceId_+"Monitoring");
00552 wlMonitoring_->submit(asMonitoring_);
00553 }
00554 catch (xcept::Exception& e) {
00555 string msg = "Failed to start workloop 'monitoring'.";
00556 XCEPT_RETHROW(evf::Exception,msg,e);
00557 }
00558 }
00559
00560
00561
00562 bool BU::monitoring(toolbox::task::WorkLoop* wl)
00563 {
00564 struct timeval monEndTime;
00565 struct timezone timezone;
00566
00567 gettimeofday(&monEndTime,&timezone);
00568
00569 lock();
00570 unsigned int monN =nbEventsBuilt_.value_;
00571 uint64_t monSumOfSquares=sumOfSquares_;
00572 unsigned int monSumOfSizes =sumOfSizes_;
00573 uint64_t deltaSumOfSquares;
00574 unlock();
00575
00576 gui_->monInfoSpace()->lock();
00577
00578 deltaT_.value_=deltaT(&monStartTime_,&monEndTime);
00579 monStartTime_=monEndTime;
00580
00581 deltaN_.value_=monN-monLastN_;
00582 monLastN_=monN;
00583
00584 deltaSumOfSquares=monSumOfSquares-monLastSumOfSquares_;
00585 deltaSumOfSquares_.value_=(double)deltaSumOfSquares;
00586 monLastSumOfSquares_=monSumOfSquares;
00587
00588 deltaSumOfSizes_.value_=monSumOfSizes-monLastSumOfSizes_;
00589 monLastSumOfSizes_=monSumOfSizes;
00590
00591 if (deltaT_.value_!=0) {
00592 throughput_=deltaSumOfSizes_.value_/deltaT_.value_;
00593 rate_ =deltaN_.value_/deltaT_.value_;
00594 }
00595 else {
00596 throughput_=0.0;
00597 rate_ =0.0;
00598 }
00599
00600 double meanOfSquares,mean,squareOfMean,variance;
00601
00602 if(deltaN_.value_!=0) {
00603 meanOfSquares=deltaSumOfSquares_.value_/((double)(deltaN_.value_));
00604 mean=((double)(deltaSumOfSizes_.value_))/((double)(deltaN_.value_));
00605 squareOfMean=mean*mean;
00606 variance=meanOfSquares-squareOfMean;
00607 average_=deltaSumOfSizes_.value_/deltaN_.value_;
00608 rms_ =std::sqrt(variance);
00609 }
00610 else {
00611 average_=0.0;
00612 rms_ =0.0;
00613 }
00614
00615 gui_->monInfoSpace()->unlock();
00616
00617 ::sleep(monSleepSec_.value_);
00618
00619 return true;
00620 }
00621
00622
00623
00625
00627
00628
00629 void BU::exportParameters()
00630 {
00631 if (0==gui_) {
00632 LOG4CPLUS_ERROR(log_,"No GUI, can't export parameters");
00633 return;
00634 }
00635
00636 gui_->addMonitorParam("url", &url_);
00637 gui_->addMonitorParam("class", &class_);
00638 gui_->addMonitorParam("instance", &instance_);
00639 gui_->addMonitorParam("hostname", &hostname_);
00640 gui_->addMonitorParam("runNumber", &runNumber_);
00641 gui_->addMonitorParam("stateName", fsm_.stateName());
00642 gui_->addMonitorParam("memUsedInMB", &memUsedInMB_);
00643 gui_->addMonitorParam("deltaT", &deltaT_);
00644 gui_->addMonitorParam("deltaN", &deltaN_);
00645 gui_->addMonitorParam("deltaSumOfSquares", &deltaSumOfSquares_);
00646 gui_->addMonitorParam("deltaSumOfSizes", &deltaSumOfSizes_);
00647 gui_->addMonitorParam("throughput", &throughput_);
00648 gui_->addMonitorParam("average", &average_);
00649 gui_->addMonitorParam("rate", &rate_);
00650 gui_->addMonitorParam("rms", &rms_);
00651
00652 gui_->addMonitorCounter("nbEvtsInBU", &nbEventsInBU_);
00653 gui_->addMonitorCounter("nbEvtsRequested", &nbEventsRequested_);
00654 gui_->addMonitorCounter("nbEvtsBuilt", &nbEventsBuilt_);
00655 gui_->addMonitorCounter("nbEvtsSent", &nbEventsSent_);
00656 gui_->addMonitorCounter("nbEvtsDiscarded", &nbEventsDiscarded_);
00657
00658 gui_->addStandardParam("mode", &mode_);
00659 gui_->addStandardParam("replay", &replay_);
00660 gui_->addStandardParam("overwriteEvtId", &overwriteEvtId_);
00661 gui_->addStandardParam("overwriteLsId", &overwriteLsId_);
00662 gui_->addStandardParam("fakeLsUpdateSecs", &fakeLsUpdateSecs_);
00663 gui_->addStandardParam("crc", &crc_);
00664 gui_->addStandardParam("firstEvent", &firstEvent_);
00665 gui_->addStandardParam("queueSize", &queueSize_);
00666 gui_->addStandardParam("eventBufferSize", &eventBufferSize_);
00667 gui_->addStandardParam("msgBufferSize", &msgBufferSize_);
00668 gui_->addStandardParam("fedSizeMax", &fedSizeMax_);
00669 gui_->addStandardParam("fedSizeMean", &fedSizeMean_);
00670 gui_->addStandardParam("fedSizeWidth", &fedSizeWidth_);
00671 gui_->addStandardParam("useFixedFedSize", &useFixedFedSize_);
00672 gui_->addStandardParam("monSleepSec", &monSleepSec_);
00673 gui_->addStandardParam("rcmsStateListener", fsm_.rcmsStateListener());
00674 gui_->addStandardParam("foundRcmsStateListener",fsm_.foundRcmsStateListener());
00675
00676
00677 gui_->exportParameters();
00678
00679 gui_->addItemChangedListener("crc",this);
00680
00681 }
00682
00683
00684
00685 void BU::reset()
00686 {
00687 gui_->resetCounters();
00688
00689 deltaT_ =0.0;
00690 deltaN_ = 0;
00691 deltaSumOfSquares_ = 0;
00692 deltaSumOfSizes_ = 0;
00693
00694 throughput_ =0.0;
00695 average_ = 0;
00696 rate_ = 0;
00697 rms_ = 0;
00698
00699 monLastN_ = 0;
00700 monLastSumOfSquares_= 0;
00701 monLastSumOfSizes_ = 0;
00702
00703 while (events_.size()) {
00704 delete events_.back();
00705 events_.pop_back();
00706 }
00707
00708 while (!rqstIds_.empty()) rqstIds_.pop();
00709 while (!freeIds_.empty()) freeIds_.pop();
00710 while (!builtIds_.empty()) builtIds_.pop();
00711 sentIds_.clear();
00712
00713 sem_init(&lock_,0,1);
00714 sem_init(&buildSem_,0,queueSize_);
00715 sem_init(&sendSem_,0,0);
00716 sem_init(&rqstSem_,0,0);
00717
00718 for (unsigned int i=0;i<queueSize_;i++) {
00719 events_.push_back(new BUEvent(i,eventBufferSize_));
00720 freeIds_.push(i);
00721 }
00722 validFedIds_.clear();
00723 fakeLs_=0;
00724 }
00725
00726
00727 double BU::deltaT(const struct timeval *start,const struct timeval *end)
00728 {
00729 unsigned int sec;
00730 unsigned int usec;
00731
00732 sec = end->tv_sec - start->tv_sec;
00733
00734 if(end->tv_usec > start->tv_usec) {
00735 usec = end->tv_usec - start->tv_usec;
00736 }
00737 else {
00738 sec--;
00739 usec = 1000000 - ((unsigned int )(start->tv_usec - end->tv_usec));
00740 }
00741
00742 return ((double)sec) + ((double)usec) / 1000000.0;
00743 }
00744
00745
00746
00747 bool BU::generateEvent(BUEvent* evt)
00748 {
00749
00750 if (replay_.value_&&nbEventsBuilt_>=(uint32_t)events_.size())
00751 {
00752 if (0!=PlaybackRawDataProvider::instance())
00753 PlaybackRawDataProvider::instance()->setFreeToEof();
00754 return true;
00755 }
00756
00757 if (0!=PlaybackRawDataProvider::instance()) {
00758
00759 unsigned int runNumber,evtNumber;
00760
00761 FEDRawDataCollection* event=
00762 PlaybackRawDataProvider::instance()->getFEDRawData(runNumber,evtNumber);
00763 if(event == 0) return false;
00764 evt->initialize(evtNumber);
00765
00766 for (unsigned int i=0;i<validFedIds_.size();i++) {
00767 unsigned int fedId =validFedIds_[i];
00768 unsigned int fedSize=event->FEDData(fedId).size();
00769 unsigned char* fedAddr=event->FEDData(fedId).data();
00770 if (overwriteEvtId_.value_ && fedAddr != 0) {
00771 fedh_t *fedHeader=(fedh_t*)fedAddr;
00772 fedHeader->eventid=(fedHeader->eventid&0xFF000000)+(evtNumber&0x00FFFFFF);
00773 }
00774 if (fedSize>0) evt->writeFed(fedId,fedAddr,fedSize);
00775 }
00776 delete event;
00777 }
00778
00779 else {
00780 unsigned int evtNumber=(firstEvent_+evtNumber_++)%0x1000000;
00781 evt->initialize(evtNumber);
00782 unsigned int fedSizeMin=fedHeaderSize_+fedTrailerSize_;
00783 for (unsigned int i=0;i<validFedIds_.size();i++) {
00784 unsigned int fedId(validFedIds_[i]);
00785 unsigned int fedSize(fedSizeMean_);
00786 if (!useFixedFedSize_) {
00787 double logFedSize=CLHEP::RandGauss::shoot(gaussianMean_,gaussianWidth_);
00788 fedSize=(unsigned int)(std::exp(logFedSize));
00789 if (fedSize<fedSizeMin) fedSize=fedSizeMin;
00790 if (fedSize>fedSizeMax_) fedSize=fedSizeMax_;
00791 fedSize-=fedSize%8;
00792 }
00793
00794 evt->writeFed(fedId,0,fedSize);
00795 evt->writeFedHeader(i);
00796 evt->writeFedTrailer(i);
00797 }
00798
00799 }
00800 return true;
00801 }
00802
00803
00804
00805 toolbox::mem::Reference *BU::createMsgChain(BUEvent* evt,
00806 unsigned int fuResourceId)
00807 {
00808 unsigned int msgHeaderSize =sizeof(I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME);
00809 unsigned int msgPayloadSize=msgBufferSize_-msgHeaderSize;
00810
00811 if((msgPayloadSize%4)!=0) LOG4CPLUS_ERROR(log_,"Invalid Payload Size.");
00812
00813
00814 if (overwriteLsId_.value_) {
00815
00816 struct timezone tz;
00817 if (!fakeLs_) {
00818 fakeLs_++;
00819 gettimeofday(&lastLsUpdate_,&tz);
00820 }
00821 else {
00822 timeval newLsUpdate;
00823 gettimeofday(&newLsUpdate,&tz);
00824 if ((unsigned long)1000000*newLsUpdate.tv_sec+newLsUpdate.tv_usec
00825 - (unsigned long)1000000*lastLsUpdate_.tv_sec+lastLsUpdate_.tv_usec
00826 >= fakeLsUpdateSecs_.value_*1000000)
00827 {
00828 fakeLs_++;
00829 lastLsUpdate_=newLsUpdate;
00830 }
00831 }
00832
00833 int gtpFedPos_=-1;
00834 int egtpFedPos_=-1;
00835 for (size_t k=0;k<validFedIds_.size();k++) {
00836 if (evt->fedId(k)==FEDNumbering::MINTriggerGTPFEDID) {
00837
00838 unsigned char * fgtpAddr = evt->fedAddr(k);
00839 unsigned int fgtpSize = evt->fedSize(k);
00840 if (fgtpAddr && fgtpSize) {
00841 gtpFedPos_=(int)k;
00842 evtn::evm_board_sense(fgtpAddr,fgtpSize);
00843 *((unsigned short*)fgtpAddr
00844 +sizeof(fedh_t)/sizeof(unsigned short)
00845 + (evtn::EVM_GTFE_BLOCK*2 + evtn::EVM_TCS_LSBLNR_OFFSET)*evtn::SLINK_HALFWORD_SIZE /sizeof(unsigned short)
00846 ) = (unsigned short)fakeLs_-1;
00847 }
00848 }
00849 if (evt->fedId(k)==FEDNumbering::MINTriggerEGTPFEDID) {
00850
00851 unsigned char * fegtpAddr = evt->fedAddr(egtpFedPos_);
00852 unsigned int fegtpSize = evt->fedSize(egtpFedPos_);
00853 if (fegtpAddr && fegtpSize) {
00854 egtpFedPos_=(int)k;
00855 *( (unsigned int*)fegtpAddr + evtn::GTPE_ORBTNR_OFFSET * evtn::SLINK_HALFWORD_SIZE/sizeof(unsigned int)
00856 ) = (unsigned int)(fakeLs_-1)*0x00100000;
00857 }
00858 }
00859 }
00860 if (gtpFedPos_<0) LOG4CPLUS_ERROR(log_,"Unable to find GTP FED in event!");
00861 if (egtpFedPos_<0 && gtpFedPos_<0) LOG4CPLUS_ERROR(log_,"Unable to find GTP or GTPE FED in event!");
00862 }
00863
00864 toolbox::mem::Reference *head =0;
00865 toolbox::mem::Reference *tail =0;
00866 toolbox::mem::Reference *bufRef=0;
00867
00868 I2O_MESSAGE_FRAME *stdMsg=0;
00869 I2O_PRIVATE_MESSAGE_FRAME *pvtMsg=0;
00870 I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME *block =0;
00871
00872 unsigned int iFed =0;
00873 unsigned int nSuperFrag =64;
00874 unsigned int nFedPerSuperFrag=validFedIds_.size()/nSuperFrag;
00875 unsigned int nBigSuperFrags =validFedIds_.size()%nSuperFrag;
00876
00877 if (evt->nFed()<nSuperFrag) {
00878 nSuperFrag=evt->nFed();
00879 nFedPerSuperFrag=1;
00880 nBigSuperFrags=0;
00881 }
00882 else
00883 {
00884 nFedPerSuperFrag=evt->nFed()/nSuperFrag;
00885 nBigSuperFrags =evt->nFed()%nSuperFrag;
00886 }
00887
00888 for (unsigned int iSuperFrag=0;iSuperFrag<nSuperFrag;iSuperFrag++) {
00889
00890
00891 unsigned int nFed=iFed+nFedPerSuperFrag;
00892 if (iSuperFrag<nBigSuperFrags) ++nFed;
00893
00894
00895 unsigned int nBlock =0;
00896 unsigned int curbSize=frlHeaderSize_;
00897 unsigned int totSize =curbSize;
00898 for (unsigned int i=iFed;i<nFed;i++) {
00899 curbSize+=evt->fedSize(i);
00900 totSize+=evt->fedSize(i);
00901 if (curbSize>msgPayloadSize) {
00902 curbSize+=frlHeaderSize_*(curbSize/msgPayloadSize);
00903 if(curbSize%msgPayloadSize)totSize+=frlHeaderSize_*(curbSize/msgPayloadSize);
00904 else totSize+=frlHeaderSize_*((curbSize/msgPayloadSize)-1);
00905 curbSize=curbSize%msgPayloadSize;
00906 }
00907 }
00908 nBlock=totSize/msgPayloadSize+(totSize%msgPayloadSize>0 ? 1 : 0);
00909
00910
00911
00912 unsigned int remainder =0;
00913 bool fedTrailerLeft=false;
00914 bool last =false;
00915 bool warning =false;
00916 unsigned char *startOfPayload=0;
00917 U32 payload(0);
00918
00919 for(unsigned int iBlock=0;iBlock<nBlock;iBlock++) {
00920
00921
00922 payload=msgPayloadSize;
00923
00924
00925 try {
00926 bufRef=toolbox::mem::getMemoryPoolFactory()->getFrame(i2oPool_,
00927 msgBufferSize_);
00928 }
00929 catch(xcept::Exception &e) {
00930 LOG4CPLUS_FATAL(log_,"xdaq::frameAlloc failed");
00931 }
00932
00933
00934 stdMsg=(I2O_MESSAGE_FRAME*)bufRef->getDataLocation();
00935 pvtMsg=(I2O_PRIVATE_MESSAGE_FRAME*)stdMsg;
00936 block =(I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME*)stdMsg;
00937
00938 pvtMsg->XFunctionCode =I2O_FU_TAKE;
00939 pvtMsg->OrganizationID =XDAQ_ORGANIZATION_ID;
00940
00941 stdMsg->MessageSize =(msgHeaderSize + payload) >> 2;
00942 stdMsg->Function =I2O_PRIVATE_MESSAGE;
00943 stdMsg->VersionOffset =0;
00944 stdMsg->MsgFlags =0;
00945 stdMsg->InitiatorAddress=i2o::utils::getAddressMap()->getTid(buAppDesc_);
00946 stdMsg->TargetAddress =i2o::utils::getAddressMap()->getTid(fuAppDesc_);
00947
00948 block->buResourceId =evt->buResourceId();
00949 block->fuTransactionId =fuResourceId;
00950 block->blockNb =iBlock;
00951 block->nbBlocksInSuperFragment=nBlock;
00952 block->superFragmentNb =iSuperFrag;
00953 block->nbSuperFragmentsInEvent=nSuperFrag;
00954 block->eventNumber =evt->evtNumber();
00955
00956
00957 startOfPayload =(unsigned char*)block+msgHeaderSize;
00958 frlh_t* frlHeader=(frlh_t*)startOfPayload;
00959 frlHeader->trigno=evt->evtNumber();
00960 frlHeader->segno =iBlock;
00961
00962 unsigned char *startOfFedBlocks=startOfPayload+frlHeaderSize_;
00963 payload -=frlHeaderSize_;
00964 frlHeader->segsize =payload;
00965 unsigned int leftspace=payload;
00966
00967
00968 if(fedTrailerLeft) {
00969 memcpy(startOfFedBlocks,
00970 evt->fedAddr(iFed)+evt->fedSize(iFed)-fedTrailerSize_,
00971 fedTrailerSize_);
00972
00973 startOfFedBlocks+=fedTrailerSize_;
00974 leftspace -=fedTrailerSize_;
00975 remainder =0;
00976 fedTrailerLeft =false;
00977
00978
00979 if((iFed==nFed-1) && !last) {
00980 frlHeader->segsize-=leftspace;
00981 int msgSize=stdMsg->MessageSize << 2;
00982 msgSize -=leftspace;
00983 bufRef->setDataSize(msgSize);
00984 stdMsg->MessageSize = msgSize >> 2;
00985 frlHeader->segsize=frlHeader->segsize | FRL_LAST_SEGM;
00986 last=true;
00987 }
00988
00989
00990 iFed++;
00991 }
00992
00996 if (remainder>0) {
00997
00998
00999 if(payload>=remainder) {
01000 memcpy(startOfFedBlocks,
01001 evt->fedAddr(iFed)+evt->fedSize(iFed)-remainder,
01002 remainder);
01003
01004 startOfFedBlocks+=remainder;
01005 leftspace -=remainder;
01006
01007
01008 if(iFed==nFed-1) {
01009 frlHeader->segsize-=leftspace;
01010 int msgSize=stdMsg->MessageSize << 2;
01011 msgSize -=leftspace;
01012 bufRef->setDataSize(msgSize);
01013 stdMsg->MessageSize = msgSize >> 2;
01014 frlHeader->segsize=frlHeader->segsize | FRL_LAST_SEGM;
01015 last=true;
01016 }
01017
01018
01019 iFed++;
01020
01021
01022 remainder=0;
01023 }
01024
01025 else if (payload>=(remainder-fedTrailerSize_)) {
01026 memcpy(startOfFedBlocks,
01027 evt->fedAddr(iFed)+evt->fedSize(iFed)-remainder,
01028 remainder-fedTrailerSize_);
01029
01030 frlHeader->segsize=remainder-fedTrailerSize_;
01031 fedTrailerLeft =true;
01032 leftspace -=(remainder-fedTrailerSize_);
01033 remainder =fedTrailerSize_;
01034 }
01035
01036 else {
01037 memcpy(startOfFedBlocks,
01038 evt->fedAddr(iFed)+evt->fedSize(iFed)-remainder,payload);
01039 remainder-=payload;
01040 leftspace =0;
01041 }
01042 }
01043
01047 if(remainder==0) {
01048
01049
01050 while(iFed<nFed) {
01051
01052
01053 if((int)leftspace<fedHeaderSize_) {
01054 frlHeader->segsize-=leftspace;
01055 break;
01056 }
01057
01058 memcpy(startOfFedBlocks,evt->fedAddr(iFed),fedHeaderSize_);
01059
01060 leftspace -=fedHeaderSize_;
01061 startOfFedBlocks+=fedHeaderSize_;
01062
01063
01064 if(evt->fedSize(iFed)-fedHeaderSize_<=leftspace) {
01065 memcpy(startOfFedBlocks,
01066 evt->fedAddr(iFed)+fedHeaderSize_,
01067 evt->fedSize(iFed)-fedHeaderSize_);
01068
01069 leftspace -=(evt->fedSize(iFed)-fedHeaderSize_);
01070 startOfFedBlocks+=(evt->fedSize(iFed)-fedHeaderSize_);
01071 }
01072
01073 else if(evt->fedSize(iFed)-fedHeaderSize_-fedTrailerSize_<=leftspace) {
01074 memcpy(startOfFedBlocks,
01075 evt->fedAddr(iFed)+fedHeaderSize_,
01076 evt->fedSize(iFed)-fedHeaderSize_-fedTrailerSize_);
01077
01078 leftspace -=(evt->fedSize(iFed)-fedHeaderSize_-fedTrailerSize_);
01079 frlHeader->segsize-=leftspace;
01080 fedTrailerLeft =true;
01081 remainder =fedTrailerSize_;
01082
01083 break;
01084 }
01085
01086 else {
01087 memcpy(startOfFedBlocks,evt->fedAddr(iFed)+fedHeaderSize_,leftspace);
01088 remainder=evt->fedSize(iFed)-fedHeaderSize_-leftspace;
01089 leftspace=0;
01090
01091 break;
01092 }
01093
01094
01095 iFed++;
01096
01097 }
01098
01099
01100 if (iFed==nFed && remainder==0 && !last) {
01101 frlHeader->segsize-=leftspace;
01102 int msgSize=stdMsg->MessageSize << 2;
01103 msgSize -=leftspace;
01104 bufRef->setDataSize(msgSize);
01105 stdMsg->MessageSize=msgSize >> 2;
01106 frlHeader->segsize =frlHeader->segsize | FRL_LAST_SEGM;
01107 last=true;
01108 }
01109
01110 }
01111
01112 if(iSuperFrag==0&&iBlock==0) {
01113 head=bufRef;
01114 tail=bufRef;
01115 }
01116 else {
01117 tail->setNextReference(bufRef);
01118 tail=bufRef;
01119 }
01120
01121 if((iBlock==nBlock-1) && remainder!=0) {
01122 nBlock++;
01123 warning=true;
01124 }
01125
01126 }
01127
01128
01129 if(warning) {
01130 toolbox::mem::Reference* next=head;
01131 do {
01132 block =(I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME*)next->getDataLocation();
01133 if (block->superFragmentNb==iSuperFrag)
01134 block->nbBlocksInSuperFragment=nBlock;
01135 } while((next=next->getNextReference()));
01136 }
01137
01138 }
01139
01140 return head;
01141 }
01142
01143
01144 void BU::dumpFrame(unsigned char* data,unsigned int len)
01145 {
01146 char left1[20];
01147 char left2[20];
01148 char right1[20];
01149 char right2[20];
01150
01151 printf("Byte 0 1 2 3 4 5 6 7\n");
01152
01153 int c(0);
01154 int pos(0);
01155
01156 for (unsigned int i=0;i<(len/8);i++) {
01157 int rpos(0);
01158 int off(3);
01159 for (pos=0;pos<12;pos+=3) {
01160 sprintf(&left1[pos],"%2.2x ",
01161 ((unsigned char*)data)[c+off]);
01162 sprintf(&right1[rpos],"%1c",
01163 ((data[c+off] > 32)&&(data[c+off] < 127)) ? data[c+off] : '.');
01164 sprintf (&left2[pos],"%2.2x ",
01165 ((unsigned char*)data)[c+off+4]);
01166 sprintf (&right2[rpos],"%1c",
01167 ((data[c+off+4] > 32)&&(data[c+off+4]<127)) ? data[c+off+4] : '.');
01168 rpos++;
01169 off--;
01170 }
01171 c+=8;
01172
01173 printf ("%4d: %s%s || %s%s %p\n",
01174 c-8, left1, left2, right1, right2, &data[c-8]);
01175 }
01176
01177 fflush(stdout);
01178 }
01179
01180
01182
01184
01185 XDAQ_INSTANTIATOR_IMPL(BU)