00001
00002
00003
00004
00005
00006
00007
00009
00010
00011 #include "EventFilter/AutoBU/interface/BU.h"
00012
00013 #include "FWCore/Utilities/interface/CRC16.h"
00014
00015 #include "DataFormats/FEDRawData/interface/FEDNumbering.h"
00016 #include "DataFormats/FEDRawData/interface/FEDRawDataCollection.h"
00017
00018 #include "xoap/SOAPEnvelope.h"
00019 #include "xoap/SOAPBody.h"
00020 #include "xoap/domutils.h"
00021
00022 #include <netinet/in.h>
00023 #include <sstream>
00024
00025
00026 using namespace std;
00027 using namespace evf;
00028
00029
00031
00033
00034
00035 BU::BU(xdaq::ApplicationStub *s)
00036 : xdaq::Application(s)
00037 , log_(getApplicationLogger())
00038 , buAppDesc_(getApplicationDescriptor())
00039 , fuAppDesc_(0)
00040 , buAppContext_(getApplicationContext())
00041 , fsm_(this)
00042 , gui_(0)
00043 , evtNumber_(0)
00044 , isBuilding_(false)
00045 , isSending_(false)
00046 , isHalting_(false)
00047 , wlBuilding_(0)
00048 , asBuilding_(0)
00049 , wlSending_(0)
00050 , asSending_(0)
00051 , wlMonitoring_(0)
00052 , asMonitoring_(0)
00053 , instance_(0)
00054 , runNumber_(0)
00055 , memUsedInMB_(0.0)
00056 , deltaT_(0.0)
00057 , deltaN_(0)
00058 , deltaSumOfSquares_(0)
00059 , deltaSumOfSizes_(0)
00060 , throughput_(0.0)
00061 , average_(0.0)
00062 , rate_(0.0)
00063 , rms_(0.0)
00064 , nbEventsInBU_(0)
00065 , nbEventsRequested_(0)
00066 , nbEventsBuilt_(0)
00067 , nbEventsSent_(0)
00068 , nbEventsDiscarded_(0)
00069 , mode_("RANDOM")
00070 , replay_(false)
00071 , crc_(true)
00072 , overwriteEvtId_(true)
00073 , firstEvent_(1)
00074 , queueSize_(32)
00075 , eventBufferSize_(0x400000)
00076 , msgBufferSize_(32768)
00077 , fedSizeMax_(65536)
00078 , fedSizeMean_(1024)
00079 , fedSizeWidth_(1024)
00080 , useFixedFedSize_(false)
00081 , monSleepSec_(1)
00082 , gaussianMean_(0.0)
00083 , gaussianWidth_(1.0)
00084 , monLastN_(0)
00085 , monLastSumOfSquares_(0)
00086 , monLastSumOfSizes_(0)
00087 , sumOfSquares_(0)
00088 , sumOfSizes_(0)
00089 , i2oPool_(0)
00090 {
00091
00092 fsm_.initialize<evf::BU>(this);
00093
00094
00095 url_ =
00096 getApplicationDescriptor()->getContextDescriptor()->getURL()+"/"+
00097 getApplicationDescriptor()->getURN();
00098 class_ =getApplicationDescriptor()->getClassName();
00099 instance_=getApplicationDescriptor()->getInstance();
00100 hostname_=getApplicationDescriptor()->getContextDescriptor()->getURL();
00101 sourceId_=class_.toString()+instance_.toString();
00102
00103
00104 i2o::bind(this,&BU::I2O_BU_ALLOCATE_Callback,I2O_BU_ALLOCATE,XDAQ_ORGANIZATION_ID);
00105 i2o::bind(this,&BU::I2O_BU_DISCARD_Callback, I2O_BU_DISCARD, XDAQ_ORGANIZATION_ID);
00106
00107
00108 string i2oPoolName=sourceId_+"_i2oPool";
00109 try {
00110 toolbox::mem::HeapAllocator *allocator=new toolbox::mem::HeapAllocator();
00111 toolbox::net::URN urn("toolbox-mem-pool",i2oPoolName);
00112 toolbox::mem::MemoryPoolFactory* poolFactory=
00113 toolbox::mem::getMemoryPoolFactory();
00114 i2oPool_=poolFactory->createPool(urn,allocator);
00115 }
00116 catch (toolbox::mem::exception::Exception& e) {
00117 string s="Failed to create pool: "+i2oPoolName;
00118 LOG4CPLUS_FATAL(log_,s);
00119 XCEPT_RETHROW(xcept::Exception,s,e);
00120 }
00121
00122
00123 xgi::bind(this,&evf::BU::webPageRequest,"Default");
00124 gui_=new WebGUI(this,&fsm_);
00125 gui_->setSmallAppIcon("/rubuilder/bu/images/bu32x32.gif");
00126 gui_->setLargeAppIcon("/rubuilder/bu/images/bu64x64.gif");
00127
00128 vector<toolbox::lang::Method*> methods=gui_->getMethods();
00129 vector<toolbox::lang::Method*>::iterator it;
00130 for (it=methods.begin();it!=methods.end();++it) {
00131 if ((*it)->type()=="cgi") {
00132 string name=static_cast<xgi::MethodSignature*>(*it)->name();
00133 xgi::bind(this,&evf::BU::webPageRequest,name);
00134 }
00135 }
00136 xgi::bind(this,&evf::BU::customWebPage,"customWebPage");
00137
00138
00139
00140 exportParameters();
00141
00142
00143 fsm_.findRcmsStateListener();
00144
00145
00146 gaussianMean_ =std::log((double)fedSizeMean_);
00147 gaussianWidth_=std::sqrt(std::log
00148 (0.5*
00149 (1+std::sqrt
00150 (1.0+4.0*
00151 fedSizeWidth_.value_*fedSizeWidth_.value_/
00152 fedSizeMean_.value_/fedSizeMean_.value_))));
00153
00154
00155 startMonitoringWorkLoop();
00156
00157
00158 BUEvent::setComputeCrc(crc_.value_);
00159 }
00160
00161
00162
00163 BU::~BU()
00164 {
00165 while (!events_.empty()) { delete events_.back(); events_.pop_back(); }
00166 }
00167
00168
00170
00172
00173
00174 bool BU::configuring(toolbox::task::WorkLoop* wl)
00175 {
00176 isHalting_=false;
00177 try {
00178 LOG4CPLUS_INFO(log_,"Start configuring ...");
00179 reset();
00180 LOG4CPLUS_INFO(log_,"Finished configuring!");
00181 fsm_.fireEvent("ConfigureDone",this);
00182 }
00183 catch (xcept::Exception &e) {
00184 string msg = "configuring FAILED: " + (string)e.what();
00185 fsm_.fireFailed(msg,this);
00186 }
00187
00188 return false;
00189 }
00190
00191
00192
00193 bool BU::enabling(toolbox::task::WorkLoop* wl)
00194 {
00195 isHalting_=false;
00196 try {
00197 LOG4CPLUS_INFO(log_,"Start enabling ...");
00198
00199
00200 if (0!=PlaybackRawDataProvider::instance()) {
00201 for (unsigned int i=0;i<(unsigned int)FEDNumbering::MAXFEDID+1;i++)
00202 if (FEDNumbering::inRange(i)) validFedIds_.push_back(i);
00203 }
00204 else{
00205 for (unsigned int i=0;i<(unsigned int)FEDNumbering::MAXFEDID+1;i++)
00206 if (FEDNumbering::inRangeNoGT(i)) validFedIds_.push_back(i);
00207 }
00208 if (!isBuilding_) startBuildingWorkLoop();
00209 if (!isSending_) startSendingWorkLoop();
00210 LOG4CPLUS_INFO(log_,"Finished enabling!");
00211 fsm_.fireEvent("EnableDone",this);
00212 }
00213 catch (xcept::Exception &e) {
00214 string msg = "enabling FAILED: " + (string)e.what();
00215 fsm_.fireFailed(msg,this);
00216 }
00217
00218 return false;
00219 }
00220
00221
00222
00223 bool BU::stopping(toolbox::task::WorkLoop* wl)
00224 {
00225 try {
00226 LOG4CPLUS_INFO(log_,"Start stopping :) ...");
00227
00228 if (0!=PlaybackRawDataProvider::instance()&&
00229 (!replay_.value_||nbEventsBuilt_<(uint32_t)events_.size())) {
00230 lock();
00231 freeIds_.push(events_.size());
00232 unlock();
00233 postBuild();
00234 while (!builtIds_.empty()) {
00235 LOG4CPLUS_INFO(log_,"wait to flush ... #builtIds="<<builtIds_.size());
00236 ::sleep(1);
00237 }
00238
00239 PlaybackRawDataProvider::instance()->setFreeToEof();
00240 }
00241
00242 lock();
00243 builtIds_.push(events_.size());
00244 unlock();
00245
00246 postSend();
00247 while (!sentIds_.empty()) {
00248 LOG4CPLUS_INFO(log_,"wait to flush ...");
00249 ::sleep(1);
00250 }
00251 if (0!=PlaybackRawDataProvider::instance()&&
00252 (replay_.value_&&nbEventsBuilt_>=(uint32_t)events_.size())) {
00253 lock();
00254 freeIds_.push(events_.size());
00255 unlock();
00256 postBuild();
00257 }
00258 LOG4CPLUS_INFO(log_,"Finished stopping!");
00259 fsm_.fireEvent("StopDone",this);
00260 }
00261 catch (xcept::Exception &e) {
00262 string msg = "stopping FAILED: " + (string)e.what();
00263 fsm_.fireFailed(msg,this);
00264 }
00265 return false;
00266 }
00267
00268
00269
00270 bool BU::halting(toolbox::task::WorkLoop* wl)
00271 {
00272 try {
00273 LOG4CPLUS_INFO(log_,"Start halting ...");
00274 isHalting_=true;
00275 if (isBuilding_&&isSending_) {
00276 lock();
00277 freeIds_.push(events_.size());
00278 builtIds_.push(events_.size());
00279 unlock();
00280 postBuild();
00281 postSend();
00282 }
00283 LOG4CPLUS_INFO(log_,"Finished halting!");
00284 fsm_.fireEvent("HaltDone",this);
00285 }
00286 catch (xcept::Exception &e) {
00287 string msg = "halting FAILED: " + (string)e.what();
00288 fsm_.fireFailed(msg,this);
00289 }
00290
00291 return false;
00292 }
00293
00294
00295
00296 xoap::MessageReference BU::fsmCallback(xoap::MessageReference msg)
00297 throw (xoap::exception::Exception)
00298 {
00299 return fsm_.commandCallback(msg);
00300 }
00301
00302
00303
00304 void BU::I2O_BU_ALLOCATE_Callback(toolbox::mem::Reference *bufRef)
00305 {
00306 if (isHalting_) {
00307 LOG4CPLUS_WARN(log_,"Ignore BU_ALLOCATE message while halting.");
00308 bufRef->release();
00309 return;
00310 }
00311
00312 I2O_MESSAGE_FRAME *stdMsg;
00313 I2O_BU_ALLOCATE_MESSAGE_FRAME *msg;
00314
00315 stdMsg=(I2O_MESSAGE_FRAME*)bufRef->getDataLocation();
00316 msg =(I2O_BU_ALLOCATE_MESSAGE_FRAME*)stdMsg;
00317
00318 if (0==fuAppDesc_) {
00319 I2O_TID fuTid=stdMsg->InitiatorAddress;
00320 fuAppDesc_=i2o::utils::getAddressMap()->getApplicationDescriptor(fuTid);
00321 }
00322
00323 for (unsigned int i=0;i<msg->n;i++) {
00324 unsigned int fuResourceId=msg->allocate[i].fuTransactionId;
00325 lock();
00326 rqstIds_.push(fuResourceId);
00327 postRqst();
00328 nbEventsRequested_++;
00329 nbEventsInBU_++;
00330 unlock();
00331 }
00332
00333 bufRef->release();
00334 }
00335
00336
00337
00338 void BU::I2O_BU_DISCARD_Callback(toolbox::mem::Reference *bufRef)
00339 {
00340 if (isHalting_) {
00341 LOG4CPLUS_WARN(log_,"Ignore BU_DISCARD message while halting.");
00342 bufRef->release();
00343 return;
00344 }
00345
00346 I2O_MESSAGE_FRAME *stdMsg=(I2O_MESSAGE_FRAME*)bufRef->getDataLocation();
00347 I2O_BU_DISCARD_MESSAGE_FRAME*msg =(I2O_BU_DISCARD_MESSAGE_FRAME*)stdMsg;
00348 unsigned int buResourceId=msg->buResourceId[0];
00349
00350 lock();
00351 int result=sentIds_.erase(buResourceId);
00352 unlock();
00353
00354 if (!result) {
00355 LOG4CPLUS_ERROR(log_,"can't discard unknown buResourceId '"<<buResourceId<<"'");
00356 }
00357 else {
00358 lock();
00359 freeIds_.push(buResourceId);
00360 nbEventsDiscarded_.value_++;
00361 unlock();
00362 postBuild();
00363 }
00364
00365 bufRef->release();
00366 }
00367
00368
00369
00370 void BU::actionPerformed(xdata::Event& e)
00371 {
00372 gui_->monInfoSpace()->lock();
00373 if (e.type()=="urn:xdata-event:ItemGroupRetrieveEvent") {
00374 mode_=(0==PlaybackRawDataProvider::instance())?"RANDOM":"PLAYBACK";
00375 if (0!=i2oPool_) memUsedInMB_=i2oPool_->getMemoryUsage().getUsed()*9.53674e-07;
00376 else memUsedInMB_=0.0;
00377 }
00378 else if (e.type()=="ItemChangedEvent") {
00379 string item=dynamic_cast<xdata::ItemChangedEvent&>(e).itemName();
00380 if (item=="crc") BUEvent::setComputeCrc(crc_.value_);
00381 }
00382 gui_->monInfoSpace()->unlock();
00383 }
00384
00385
00386
00387 void BU::webPageRequest(xgi::Input *in,xgi::Output *out)
00388 throw (xgi::exception::Exception)
00389 {
00390 string name=in->getenv("PATH_INFO");
00391 if (name.empty()) name="defaultWebPage";
00392 static_cast<xgi::MethodSignature*>(gui_->getMethod(name))->invoke(in,out);
00393 }
00394
00395
00396
00397 void BU::customWebPage(xgi::Input*in,xgi::Output*out)
00398 throw (xgi::exception::Exception)
00399 {
00400 *out<<"<html></html>"<<endl;
00401 }
00402
00403
00404
00405 void BU::startBuildingWorkLoop() throw (evf::Exception)
00406 {
00407 try {
00408 LOG4CPLUS_INFO(log_,"Start 'building' workloop");
00409 wlBuilding_=
00410 toolbox::task::getWorkLoopFactory()->getWorkLoop(sourceId_+
00411 "Building",
00412 "waiting");
00413 if (!wlBuilding_->isActive()) wlBuilding_->activate();
00414 asBuilding_=toolbox::task::bind(this,&BU::building,sourceId_+"Building");
00415 wlBuilding_->submit(asBuilding_);
00416 isBuilding_=true;
00417 }
00418 catch (xcept::Exception& e) {
00419 string msg = "Failed to start workloop 'building'.";
00420 XCEPT_RETHROW(evf::Exception,msg,e);
00421 }
00422 }
00423
00424
00425
00426 bool BU::building(toolbox::task::WorkLoop* wl)
00427 {
00428 waitBuild();
00429 lock();
00430 unsigned int buResourceId=freeIds_.front(); freeIds_.pop();
00431 unlock();
00432
00433 if (buResourceId>=(uint32_t)events_.size()) {
00434 LOG4CPLUS_INFO(log_,"shutdown 'building' workloop.");
00435 isBuilding_=false;
00436 return false;
00437 }
00438
00439 if (!isHalting_) {
00440 BUEvent* evt=events_[buResourceId];
00441 if(generateEvent(evt)) {
00442 lock();
00443 nbEventsBuilt_++;
00444 builtIds_.push(buResourceId);
00445 unlock();
00446
00447 postSend();
00448 }
00449 else {
00450 LOG4CPLUS_INFO(log_,"building:received null post");
00451 lock();
00452 unsigned int saveBUResourceId = buResourceId;
00453
00454 freeIds_.push(saveBUResourceId);
00455 unlock();
00456 isBuilding_=false;
00457 return false;
00458 }
00459 }
00460 return true;
00461 }
00462
00463
00464
00465 void BU::startSendingWorkLoop() throw (evf::Exception)
00466 {
00467 try {
00468 LOG4CPLUS_INFO(log_,"Start 'sending' workloop");
00469 wlSending_=toolbox::task::getWorkLoopFactory()->getWorkLoop(sourceId_+
00470 "Sending",
00471 "waiting");
00472 if (!wlSending_->isActive()) wlSending_->activate();
00473 asSending_=toolbox::task::bind(this,&BU::sending,sourceId_+"Sending");
00474 wlSending_->submit(asSending_);
00475 isSending_=true;
00476 }
00477 catch (xcept::Exception& e) {
00478 string msg = "Failed to start workloop 'sending'.";
00479 XCEPT_RETHROW(evf::Exception,msg,e);
00480 }
00481 }
00482
00483
00484
00485 bool BU::sending(toolbox::task::WorkLoop* wl)
00486 {
00487 waitSend();
00488 lock();
00489 unsigned int buResourceId=builtIds_.front(); builtIds_.pop();
00490 unlock();
00491
00492 if (buResourceId>=(uint32_t)events_.size()) {
00493 LOG4CPLUS_INFO(log_,"shutdown 'sending' workloop.");
00494 isSending_=false;
00495 return false;
00496 }
00497
00498 if (!isHalting_) {
00499 waitRqst();
00500 lock();
00501 unsigned int fuResourceId=rqstIds_.front(); rqstIds_.pop();
00502 unlock();
00503
00504 BUEvent* evt=events_[buResourceId];
00505 toolbox::mem::Reference* msg=createMsgChain(evt,fuResourceId);
00506
00507 lock();
00508 sumOfSquares_+=(uint64_t)evt->evtSize()*(uint64_t)evt->evtSize();
00509 sumOfSizes_ +=evt->evtSize();
00510 nbEventsInBU_--;
00511 nbEventsSent_++;
00512 sentIds_.insert(buResourceId);
00513 unlock();
00514
00515 buAppContext_->postFrame(msg,buAppDesc_,fuAppDesc_);
00516 }
00517
00518 return true;
00519 }
00520
00521
00522
00523 void BU::startMonitoringWorkLoop() throw (evf::Exception)
00524 {
00525 struct timezone timezone;
00526 gettimeofday(&monStartTime_,&timezone);
00527
00528 try {
00529 LOG4CPLUS_INFO(log_,"Start 'monitoring' workloop");
00530 wlMonitoring_=
00531 toolbox::task::getWorkLoopFactory()->getWorkLoop(sourceId_+
00532 "Monitoring",
00533 "waiting");
00534 if (!wlMonitoring_->isActive()) wlMonitoring_->activate();
00535 asMonitoring_=toolbox::task::bind(this,&BU::monitoring,sourceId_+"Monitoring");
00536 wlMonitoring_->submit(asMonitoring_);
00537 }
00538 catch (xcept::Exception& e) {
00539 string msg = "Failed to start workloop 'monitoring'.";
00540 XCEPT_RETHROW(evf::Exception,msg,e);
00541 }
00542 }
00543
00544
00545
00546 bool BU::monitoring(toolbox::task::WorkLoop* wl)
00547 {
00548 struct timeval monEndTime;
00549 struct timezone timezone;
00550
00551 gettimeofday(&monEndTime,&timezone);
00552
00553 lock();
00554 unsigned int monN =nbEventsBuilt_.value_;
00555 uint64_t monSumOfSquares=sumOfSquares_;
00556 unsigned int monSumOfSizes =sumOfSizes_;
00557 uint64_t deltaSumOfSquares;
00558 unlock();
00559
00560 gui_->monInfoSpace()->lock();
00561
00562 deltaT_.value_=deltaT(&monStartTime_,&monEndTime);
00563 monStartTime_=monEndTime;
00564
00565 deltaN_.value_=monN-monLastN_;
00566 monLastN_=monN;
00567
00568 deltaSumOfSquares=monSumOfSquares-monLastSumOfSquares_;
00569 deltaSumOfSquares_.value_=(double)deltaSumOfSquares;
00570 monLastSumOfSquares_=monSumOfSquares;
00571
00572 deltaSumOfSizes_.value_=monSumOfSizes-monLastSumOfSizes_;
00573 monLastSumOfSizes_=monSumOfSizes;
00574
00575 if (deltaT_.value_!=0) {
00576 throughput_=deltaSumOfSizes_.value_/deltaT_.value_;
00577 rate_ =deltaN_.value_/deltaT_.value_;
00578 }
00579 else {
00580 throughput_=0.0;
00581 rate_ =0.0;
00582 }
00583
00584 double meanOfSquares,mean,squareOfMean,variance;
00585
00586 if(deltaN_.value_!=0) {
00587 meanOfSquares=deltaSumOfSquares_.value_/((double)(deltaN_.value_));
00588 mean=((double)(deltaSumOfSizes_.value_))/((double)(deltaN_.value_));
00589 squareOfMean=mean*mean;
00590 variance=meanOfSquares-squareOfMean;
00591 average_=deltaSumOfSizes_.value_/deltaN_.value_;
00592 rms_ =std::sqrt(variance);
00593 }
00594 else {
00595 average_=0.0;
00596 rms_ =0.0;
00597 }
00598
00599 gui_->monInfoSpace()->unlock();
00600
00601 ::sleep(monSleepSec_.value_);
00602
00603 return true;
00604 }
00605
00606
00607
00609
00611
00612
00613 void BU::exportParameters()
00614 {
00615 if (0==gui_) {
00616 LOG4CPLUS_ERROR(log_,"No GUI, can't export parameters");
00617 return;
00618 }
00619
00620 gui_->addMonitorParam("url", &url_);
00621 gui_->addMonitorParam("class", &class_);
00622 gui_->addMonitorParam("instance", &instance_);
00623 gui_->addMonitorParam("hostname", &hostname_);
00624 gui_->addMonitorParam("runNumber", &runNumber_);
00625 gui_->addMonitorParam("stateName", fsm_.stateName());
00626 gui_->addMonitorParam("memUsedInMB", &memUsedInMB_);
00627 gui_->addMonitorParam("deltaT", &deltaT_);
00628 gui_->addMonitorParam("deltaN", &deltaN_);
00629 gui_->addMonitorParam("deltaSumOfSquares", &deltaSumOfSquares_);
00630 gui_->addMonitorParam("deltaSumOfSizes", &deltaSumOfSizes_);
00631 gui_->addMonitorParam("throughput", &throughput_);
00632 gui_->addMonitorParam("average", &average_);
00633 gui_->addMonitorParam("rate", &rate_);
00634 gui_->addMonitorParam("rms", &rms_);
00635
00636 gui_->addMonitorCounter("nbEvtsInBU", &nbEventsInBU_);
00637 gui_->addMonitorCounter("nbEvtsRequested", &nbEventsRequested_);
00638 gui_->addMonitorCounter("nbEvtsBuilt", &nbEventsBuilt_);
00639 gui_->addMonitorCounter("nbEvtsSent", &nbEventsSent_);
00640 gui_->addMonitorCounter("nbEvtsDiscarded", &nbEventsDiscarded_);
00641
00642 gui_->addStandardParam("mode", &mode_);
00643 gui_->addStandardParam("replay", &replay_);
00644 gui_->addStandardParam("overwriteEvtId", &overwriteEvtId_);
00645 gui_->addStandardParam("crc", &crc_);
00646 gui_->addStandardParam("firstEvent", &firstEvent_);
00647 gui_->addStandardParam("queueSize", &queueSize_);
00648 gui_->addStandardParam("eventBufferSize", &eventBufferSize_);
00649 gui_->addStandardParam("msgBufferSize", &msgBufferSize_);
00650 gui_->addStandardParam("fedSizeMax", &fedSizeMax_);
00651 gui_->addStandardParam("fedSizeMean", &fedSizeMean_);
00652 gui_->addStandardParam("fedSizeWidth", &fedSizeWidth_);
00653 gui_->addStandardParam("useFixedFedSize", &useFixedFedSize_);
00654 gui_->addStandardParam("monSleepSec", &monSleepSec_);
00655 gui_->addStandardParam("rcmsStateListener", fsm_.rcmsStateListener());
00656 gui_->addStandardParam("foundRcmsStateListener",fsm_.foundRcmsStateListener());
00657
00658
00659 gui_->exportParameters();
00660
00661 gui_->addItemChangedListener("crc",this);
00662
00663 }
00664
00665
00666
00667 void BU::reset()
00668 {
00669 gui_->resetCounters();
00670
00671 deltaT_ =0.0;
00672 deltaN_ = 0;
00673 deltaSumOfSquares_ = 0;
00674 deltaSumOfSizes_ = 0;
00675
00676 throughput_ =0.0;
00677 average_ = 0;
00678 rate_ = 0;
00679 rms_ = 0;
00680
00681 monLastN_ = 0;
00682 monLastSumOfSquares_= 0;
00683 monLastSumOfSizes_ = 0;
00684
00685 while (events_.size()) {
00686 delete events_.back();
00687 events_.pop_back();
00688 }
00689
00690 while (!rqstIds_.empty()) rqstIds_.pop();
00691 while (!freeIds_.empty()) freeIds_.pop();
00692 while (!builtIds_.empty()) builtIds_.pop();
00693 sentIds_.clear();
00694
00695 sem_init(&lock_,0,1);
00696 sem_init(&buildSem_,0,queueSize_);
00697 sem_init(&sendSem_,0,0);
00698 sem_init(&rqstSem_,0,0);
00699
00700 for (unsigned int i=0;i<queueSize_;i++) {
00701 events_.push_back(new BUEvent(i,eventBufferSize_));
00702 freeIds_.push(i);
00703 }
00704 }
00705
00706
00707
00708 double BU::deltaT(const struct timeval *start,const struct timeval *end)
00709 {
00710 unsigned int sec;
00711 unsigned int usec;
00712
00713 sec = end->tv_sec - start->tv_sec;
00714
00715 if(end->tv_usec > start->tv_usec) {
00716 usec = end->tv_usec - start->tv_usec;
00717 }
00718 else {
00719 sec--;
00720 usec = 1000000 - ((unsigned int )(start->tv_usec - end->tv_usec));
00721 }
00722
00723 return ((double)sec) + ((double)usec) / 1000000.0;
00724 }
00725
00726
00727
00728 bool BU::generateEvent(BUEvent* evt)
00729 {
00730
00731 if (replay_.value_&&nbEventsBuilt_>=(uint32_t)events_.size())
00732 {
00733 PlaybackRawDataProvider::instance()->setFreeToEof();
00734 return true;
00735 }
00736
00737 if (0!=PlaybackRawDataProvider::instance()) {
00738
00739 unsigned int runNumber,evtNumber;
00740
00741 FEDRawDataCollection* event=
00742 PlaybackRawDataProvider::instance()->getFEDRawData(runNumber,evtNumber);
00743 if(event == 0) return false;
00744 evt->initialize(evtNumber);
00745
00746 for (unsigned int i=0;i<validFedIds_.size();i++) {
00747 unsigned int fedId =validFedIds_[i];
00748 unsigned int fedSize=event->FEDData(fedId).size();
00749 unsigned char* fedAddr=event->FEDData(fedId).data();
00750 if (overwriteEvtId_.value_ && fedAddr != 0) {
00751 fedh_t *fedHeader=(fedh_t*)fedAddr;
00752 fedHeader->eventid=(fedHeader->eventid&0xFF000000)+(evtNumber&0x00FFFFFF);
00753 }
00754 if (fedSize>0) evt->writeFed(fedId,fedAddr,fedSize);
00755 }
00756 delete event;
00757 }
00758
00759 else {
00760 unsigned int evtNumber=(firstEvent_+evtNumber_++)%0x1000000;
00761 evt->initialize(evtNumber);
00762 unsigned int fedSizeMin=fedHeaderSize_+fedTrailerSize_;
00763 for (unsigned int i=0;i<validFedIds_.size();i++) {
00764 unsigned int fedId(validFedIds_[i]);
00765 unsigned int fedSize(fedSizeMean_);
00766 if (!useFixedFedSize_) {
00767 double logFedSize=CLHEP::RandGauss::shoot(gaussianMean_,gaussianWidth_);
00768 fedSize=(unsigned int)(std::exp(logFedSize));
00769 if (fedSize<fedSizeMin) fedSize=fedSizeMin;
00770 if (fedSize>fedSizeMax_) fedSize=fedSizeMax_;
00771 fedSize-=fedSize%8;
00772 }
00773
00774 evt->writeFed(fedId,0,fedSize);
00775 evt->writeFedHeader(i);
00776 evt->writeFedTrailer(i);
00777 }
00778
00779 }
00780 return true;
00781 }
00782
00783
00784
00785 toolbox::mem::Reference *BU::createMsgChain(BUEvent* evt,
00786 unsigned int fuResourceId)
00787 {
00788 unsigned int msgHeaderSize =sizeof(I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME);
00789 unsigned int msgPayloadSize=msgBufferSize_-msgHeaderSize;
00790
00791 if((msgPayloadSize%4)!=0) LOG4CPLUS_ERROR(log_,"Invalid Payload Size.");
00792
00793 toolbox::mem::Reference *head =0;
00794 toolbox::mem::Reference *tail =0;
00795 toolbox::mem::Reference *bufRef=0;
00796
00797 I2O_MESSAGE_FRAME *stdMsg=0;
00798 I2O_PRIVATE_MESSAGE_FRAME *pvtMsg=0;
00799 I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME *block =0;
00800
00801 unsigned int iFed =0;
00802 unsigned int nSuperFrag =64;
00803 unsigned int nFedPerSuperFrag=validFedIds_.size()/nSuperFrag;
00804 unsigned int nBigSuperFrags =validFedIds_.size()%nSuperFrag;
00805
00806 if (evt->nFed()<nSuperFrag) {
00807 nSuperFrag=evt->nFed();
00808 nFedPerSuperFrag=1;
00809 nBigSuperFrags=0;
00810 }
00811 else
00812 {
00813 nFedPerSuperFrag=evt->nFed()/nSuperFrag;
00814 nBigSuperFrags =evt->nFed()%nSuperFrag;
00815 }
00816
00817 for (unsigned int iSuperFrag=0;iSuperFrag<nSuperFrag;iSuperFrag++) {
00818
00819
00820 unsigned int nFed=iFed+nFedPerSuperFrag;
00821 if (iSuperFrag<nBigSuperFrags) ++nFed;
00822
00823
00824 unsigned int nBlock =0;
00825 unsigned int curbSize=frlHeaderSize_;
00826 unsigned int totSize =curbSize;
00827 for (unsigned int i=iFed;i<nFed;i++) {
00828 curbSize+=evt->fedSize(i);
00829 totSize+=evt->fedSize(i);
00830 if (curbSize>msgPayloadSize) {
00831 curbSize+=frlHeaderSize_*(curbSize/msgPayloadSize);
00832 if(curbSize%msgPayloadSize)totSize+=frlHeaderSize_*(curbSize/msgPayloadSize);
00833 else totSize+=frlHeaderSize_*((curbSize/msgPayloadSize)-1);
00834 curbSize=curbSize%msgPayloadSize;
00835 }
00836 }
00837 nBlock=totSize/msgPayloadSize+(totSize%msgPayloadSize>0 ? 1 : 0);
00838
00839
00840
00841 unsigned int remainder =0;
00842 bool fedTrailerLeft=false;
00843 bool last =false;
00844 bool warning =false;
00845 unsigned char *startOfPayload=0;
00846 U32 payload(0);
00847
00848 for(unsigned int iBlock=0;iBlock<nBlock;iBlock++) {
00849
00850
00851 payload=msgPayloadSize;
00852
00853
00854 try {
00855 bufRef=toolbox::mem::getMemoryPoolFactory()->getFrame(i2oPool_,
00856 msgBufferSize_);
00857 }
00858 catch(xcept::Exception &e) {
00859 LOG4CPLUS_FATAL(log_,"xdaq::frameAlloc failed");
00860 }
00861
00862
00863 stdMsg=(I2O_MESSAGE_FRAME*)bufRef->getDataLocation();
00864 pvtMsg=(I2O_PRIVATE_MESSAGE_FRAME*)stdMsg;
00865 block =(I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME*)stdMsg;
00866
00867 pvtMsg->XFunctionCode =I2O_FU_TAKE;
00868 pvtMsg->OrganizationID =XDAQ_ORGANIZATION_ID;
00869
00870 stdMsg->MessageSize =(msgHeaderSize + payload) >> 2;
00871 stdMsg->Function =I2O_PRIVATE_MESSAGE;
00872 stdMsg->VersionOffset =0;
00873 stdMsg->MsgFlags =0;
00874 stdMsg->InitiatorAddress=i2o::utils::getAddressMap()->getTid(buAppDesc_);
00875 stdMsg->TargetAddress =i2o::utils::getAddressMap()->getTid(fuAppDesc_);
00876
00877 block->buResourceId =evt->buResourceId();
00878 block->fuTransactionId =fuResourceId;
00879 block->blockNb =iBlock;
00880 block->nbBlocksInSuperFragment=nBlock;
00881 block->superFragmentNb =iSuperFrag;
00882 block->nbSuperFragmentsInEvent=nSuperFrag;
00883 block->eventNumber =evt->evtNumber();
00884
00885
00886 startOfPayload =(unsigned char*)block+msgHeaderSize;
00887 frlh_t* frlHeader=(frlh_t*)startOfPayload;
00888 frlHeader->trigno=evt->evtNumber();
00889 frlHeader->segno =iBlock;
00890
00891 unsigned char *startOfFedBlocks=startOfPayload+frlHeaderSize_;
00892 payload -=frlHeaderSize_;
00893 frlHeader->segsize =payload;
00894 unsigned int leftspace=payload;
00895
00896
00897 if(fedTrailerLeft) {
00898 memcpy(startOfFedBlocks,
00899 evt->fedAddr(iFed)+evt->fedSize(iFed)-fedTrailerSize_,
00900 fedTrailerSize_);
00901
00902 startOfFedBlocks+=fedTrailerSize_;
00903 leftspace -=fedTrailerSize_;
00904 remainder =0;
00905 fedTrailerLeft =false;
00906
00907
00908 if((iFed==nFed-1) && !last) {
00909 frlHeader->segsize-=leftspace;
00910 int msgSize=stdMsg->MessageSize << 2;
00911 msgSize -=leftspace;
00912 bufRef->setDataSize(msgSize);
00913 stdMsg->MessageSize = msgSize >> 2;
00914 frlHeader->segsize=frlHeader->segsize | FRL_LAST_SEGM;
00915 last=true;
00916 }
00917
00918
00919 iFed++;
00920 }
00921
00925 if (remainder>0) {
00926
00927
00928 if(payload>=remainder) {
00929 memcpy(startOfFedBlocks,
00930 evt->fedAddr(iFed)+evt->fedSize(iFed)-remainder,
00931 remainder);
00932
00933 startOfFedBlocks+=remainder;
00934 leftspace -=remainder;
00935
00936
00937 if(iFed==nFed-1) {
00938 frlHeader->segsize-=leftspace;
00939 int msgSize=stdMsg->MessageSize << 2;
00940 msgSize -=leftspace;
00941 bufRef->setDataSize(msgSize);
00942 stdMsg->MessageSize = msgSize >> 2;
00943 frlHeader->segsize=frlHeader->segsize | FRL_LAST_SEGM;
00944 last=true;
00945 }
00946
00947
00948 iFed++;
00949
00950
00951 remainder=0;
00952 }
00953
00954 else if (payload>=(remainder-fedTrailerSize_)) {
00955 memcpy(startOfFedBlocks,
00956 evt->fedAddr(iFed)+evt->fedSize(iFed)-remainder,
00957 remainder-fedTrailerSize_);
00958
00959 frlHeader->segsize=remainder-fedTrailerSize_;
00960 fedTrailerLeft =true;
00961 leftspace -=(remainder-fedTrailerSize_);
00962 remainder =fedTrailerSize_;
00963 }
00964
00965 else {
00966 memcpy(startOfFedBlocks,
00967 evt->fedAddr(iFed)+evt->fedSize(iFed)-remainder,payload);
00968 remainder-=payload;
00969 leftspace =0;
00970 }
00971 }
00972
00976 if(remainder==0) {
00977
00978
00979 while(iFed<nFed) {
00980
00981
00982 if((int)leftspace<fedHeaderSize_) {
00983 frlHeader->segsize-=leftspace;
00984 break;
00985 }
00986
00987 memcpy(startOfFedBlocks,evt->fedAddr(iFed),fedHeaderSize_);
00988
00989 leftspace -=fedHeaderSize_;
00990 startOfFedBlocks+=fedHeaderSize_;
00991
00992
00993 if(evt->fedSize(iFed)-fedHeaderSize_<=leftspace) {
00994 memcpy(startOfFedBlocks,
00995 evt->fedAddr(iFed)+fedHeaderSize_,
00996 evt->fedSize(iFed)-fedHeaderSize_);
00997
00998 leftspace -=(evt->fedSize(iFed)-fedHeaderSize_);
00999 startOfFedBlocks+=(evt->fedSize(iFed)-fedHeaderSize_);
01000 }
01001
01002 else if(evt->fedSize(iFed)-fedHeaderSize_-fedTrailerSize_<=leftspace) {
01003 memcpy(startOfFedBlocks,
01004 evt->fedAddr(iFed)+fedHeaderSize_,
01005 evt->fedSize(iFed)-fedHeaderSize_-fedTrailerSize_);
01006
01007 leftspace -=(evt->fedSize(iFed)-fedHeaderSize_-fedTrailerSize_);
01008 frlHeader->segsize-=leftspace;
01009 fedTrailerLeft =true;
01010 remainder =fedTrailerSize_;
01011
01012 break;
01013 }
01014
01015 else {
01016 memcpy(startOfFedBlocks,evt->fedAddr(iFed)+fedHeaderSize_,leftspace);
01017 remainder=evt->fedSize(iFed)-fedHeaderSize_-leftspace;
01018 leftspace=0;
01019
01020 break;
01021 }
01022
01023
01024 iFed++;
01025
01026 }
01027
01028
01029 if (iFed==nFed && remainder==0 && !last) {
01030 frlHeader->segsize-=leftspace;
01031 int msgSize=stdMsg->MessageSize << 2;
01032 msgSize -=leftspace;
01033 bufRef->setDataSize(msgSize);
01034 stdMsg->MessageSize=msgSize >> 2;
01035 frlHeader->segsize =frlHeader->segsize | FRL_LAST_SEGM;
01036 last=true;
01037 }
01038
01039 }
01040
01041 if(iSuperFrag==0&&iBlock==0) {
01042 head=bufRef;
01043 tail=bufRef;
01044 }
01045 else {
01046 tail->setNextReference(bufRef);
01047 tail=bufRef;
01048 }
01049
01050 if((iBlock==nBlock-1) && remainder!=0) {
01051 nBlock++;
01052 warning=true;
01053 }
01054
01055 }
01056
01057
01058 if(warning) {
01059 toolbox::mem::Reference* next=head;
01060 do {
01061 block =(I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME*)next->getDataLocation();
01062 if (block->superFragmentNb==iSuperFrag)
01063 block->nbBlocksInSuperFragment=nBlock;
01064 } while((next=next->getNextReference()));
01065 }
01066
01067 }
01068
01069 return head;
01070 }
01071
01072
01073 void BU::dumpFrame(unsigned char* data,unsigned int len)
01074 {
01075 char left1[20];
01076 char left2[20];
01077 char right1[20];
01078 char right2[20];
01079
01080 printf("Byte 0 1 2 3 4 5 6 7\n");
01081
01082 int c(0);
01083 int pos(0);
01084
01085 for (unsigned int i=0;i<(len/8);i++) {
01086 int rpos(0);
01087 int off(3);
01088 for (pos=0;pos<12;pos+=3) {
01089 sprintf(&left1[pos],"%2.2x ",
01090 ((unsigned char*)data)[c+off]);
01091 sprintf(&right1[rpos],"%1c",
01092 ((data[c+off] > 32)&&(data[c+off] < 127)) ? data[c+off] : '.');
01093 sprintf (&left2[pos],"%2.2x ",
01094 ((unsigned char*)data)[c+off+4]);
01095 sprintf (&right2[rpos],"%1c",
01096 ((data[c+off+4] > 32)&&(data[c+off+4]<127)) ? data[c+off+4] : '.');
01097 rpos++;
01098 off--;
01099 }
01100 c+=8;
01101
01102 printf ("%4d: %s%s || %s%s %p\n",
01103 c-8, left1, left2, right1, right2, &data[c-8]);
01104 }
01105
01106 fflush(stdout);
01107 }
01108
01109
01111
01113
01114 XDAQ_INSTANTIATOR_IMPL(BU)