CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_5_3_0/src/EventFilter/AutoBU/src/BU.cc

Go to the documentation of this file.
00001 
00002 //
00003 // BU
00004 // --
00005 //
00006 //                                         Emilio Meschi <emilio.meschi@cern.ch>
00007 //                       Philipp Schieferdecker <philipp.schieferdecker@cern.ch>
00009 
00010 
00011 #include "EventFilter/AutoBU/interface/BU.h"
00012 
00013 #include "FWCore/Utilities/interface/CRC16.h"
00014 
00015 #include "DataFormats/FEDRawData/interface/FEDNumbering.h"
00016 #include "DataFormats/FEDRawData/interface/FEDRawDataCollection.h"
00017 
00018 #include "xoap/SOAPEnvelope.h"
00019 #include "xoap/SOAPBody.h"
00020 #include "xoap/domutils.h"
00021 
00022 #include <netinet/in.h>
00023 #include <sstream>
00024 
00025 
00026 using namespace std;
00027 using namespace evf;
00028 
00029 
00031 // construction/destruction
00033 
00034 //______________________________________________________________________________
00035 BU::BU(xdaq::ApplicationStub *s) 
00036   : xdaq::Application(s)
00037   , log_(getApplicationLogger())
00038   , buAppDesc_(getApplicationDescriptor())
00039   , fuAppDesc_(0)
00040   , buAppContext_(getApplicationContext())
00041   , fsm_(this)
00042   , gui_(0)
00043   , evtNumber_(0)
00044   , isBuilding_(false)
00045   , isSending_(false)
00046   , isHalting_(false)
00047   , wlBuilding_(0)
00048   , asBuilding_(0)
00049   , wlSending_(0)
00050   , asSending_(0)
00051   , wlMonitoring_(0)
00052   , asMonitoring_(0)
00053   , instance_(0)
00054   , runNumber_(0)
00055   , memUsedInMB_(0.0)
00056   , deltaT_(0.0)
00057   , deltaN_(0)
00058   , deltaSumOfSquares_(0)
00059   , deltaSumOfSizes_(0)
00060   , throughput_(0.0)
00061   , average_(0.0)
00062   , rate_(0.0)
00063   , rms_(0.0)
00064   , nbEventsInBU_(0)
00065   , nbEventsRequested_(0)
00066   , nbEventsBuilt_(0)
00067   , nbEventsSent_(0)
00068   , nbEventsDiscarded_(0)
00069   , mode_("RANDOM")
00070   , replay_(false)
00071   , crc_(true)
00072   , overwriteEvtId_(true)
00073   , firstEvent_(1)
00074   , queueSize_(32)
00075   , eventBufferSize_(0x400000)
00076   , msgBufferSize_(32768)
00077   , fedSizeMax_(65536)
00078   , fedSizeMean_(1024)
00079   , fedSizeWidth_(1024)
00080   , useFixedFedSize_(false)
00081   , monSleepSec_(1)
00082   , gaussianMean_(0.0)
00083   , gaussianWidth_(1.0)
00084   , monLastN_(0)
00085   , monLastSumOfSquares_(0)
00086   , monLastSumOfSizes_(0)
00087   , sumOfSquares_(0)
00088   , sumOfSizes_(0)
00089   , i2oPool_(0)
00090 {
00091   // initialize state machine
00092   fsm_.initialize<evf::BU>(this);
00093   
00094   // initialize application info
00095   url_     =
00096     getApplicationDescriptor()->getContextDescriptor()->getURL()+"/"+
00097     getApplicationDescriptor()->getURN();
00098   class_   =getApplicationDescriptor()->getClassName();
00099   instance_=getApplicationDescriptor()->getInstance();
00100   hostname_=getApplicationDescriptor()->getContextDescriptor()->getURL();
00101   sourceId_=class_.toString()+instance_.toString();
00102   
00103   // i2o callbacks
00104   i2o::bind(this,&BU::I2O_BU_ALLOCATE_Callback,I2O_BU_ALLOCATE,XDAQ_ORGANIZATION_ID);
00105   i2o::bind(this,&BU::I2O_BU_DISCARD_Callback, I2O_BU_DISCARD, XDAQ_ORGANIZATION_ID);
00106   
00107   // allocate i2o memery pool
00108   string i2oPoolName=sourceId_+"_i2oPool";
00109   try {
00110     toolbox::mem::HeapAllocator *allocator=new toolbox::mem::HeapAllocator();
00111     toolbox::net::URN urn("toolbox-mem-pool",i2oPoolName);
00112     toolbox::mem::MemoryPoolFactory* poolFactory=
00113       toolbox::mem::getMemoryPoolFactory();
00114     i2oPool_=poolFactory->createPool(urn,allocator);
00115   }
00116   catch (toolbox::mem::exception::Exception& e) {
00117     string s="Failed to create pool: "+i2oPoolName;
00118     LOG4CPLUS_FATAL(log_,s);
00119     XCEPT_RETHROW(xcept::Exception,s,e);
00120   }
00121   
00122   // web interface
00123   xgi::bind(this,&evf::BU::webPageRequest,"Default");
00124   gui_=new WebGUI(this,&fsm_);
00125   gui_->setSmallAppIcon("/rubuilder/bu/images/bu32x32.gif");
00126   gui_->setLargeAppIcon("/rubuilder/bu/images/bu64x64.gif");
00127   
00128   vector<toolbox::lang::Method*> methods=gui_->getMethods();
00129   vector<toolbox::lang::Method*>::iterator it;
00130   for (it=methods.begin();it!=methods.end();++it) {
00131     if ((*it)->type()=="cgi") {
00132       string name=static_cast<xgi::MethodSignature*>(*it)->name();
00133       xgi::bind(this,&evf::BU::webPageRequest,name);
00134     }
00135   }
00136   xgi::bind(this,&evf::BU::customWebPage,"customWebPage");
00137   
00138   
00139   // export parameters to info space(s)
00140   exportParameters();
00141 
00142   // findRcmsStateListener
00143   fsm_.findRcmsStateListener();
00144   
00145   // compute parameters for fed size generation (a la Emilio)
00146   gaussianMean_ =std::log((double)fedSizeMean_);
00147   gaussianWidth_=std::sqrt(std::log
00148                            (0.5*
00149                             (1+std::sqrt
00150                              (1.0+4.0*
00151                               fedSizeWidth_.value_*fedSizeWidth_.value_/
00152                               fedSizeMean_.value_/fedSizeMean_.value_))));
00153 
00154   // start monitoring thread, once and for all
00155   startMonitoringWorkLoop();
00156   
00157   // propagate crc flag to BUEvent
00158   BUEvent::setComputeCrc(crc_.value_);
00159 }
00160 
00161 
00162 //______________________________________________________________________________
00163 BU::~BU()
00164 {
00165   while (!events_.empty()) { delete events_.back(); events_.pop_back(); }
00166 }
00167 
00168 
00170 // implementation of member functions
00172 
00173 //______________________________________________________________________________
00174 bool BU::configuring(toolbox::task::WorkLoop* wl)
00175 {
00176   isHalting_=false;
00177   try {
00178     LOG4CPLUS_INFO(log_,"Start configuring ...");
00179     reset();
00180     LOG4CPLUS_INFO(log_,"Finished configuring!");
00181     fsm_.fireEvent("ConfigureDone",this);
00182   }
00183   catch (xcept::Exception &e) {
00184     string msg = "configuring FAILED: " + (string)e.what();
00185     fsm_.fireFailed(msg,this);
00186   }
00187 
00188   return false;
00189 }
00190 
00191 
00192 //______________________________________________________________________________
00193 bool BU::enabling(toolbox::task::WorkLoop* wl)
00194 {
00195   isHalting_=false;
00196   try {
00197     LOG4CPLUS_INFO(log_,"Start enabling ...");
00198     // determine valid fed ids (assumes Playback EP is already configured hence PBRDP::instance 
00199     // not null in case we are playing back)
00200     if (0!=PlaybackRawDataProvider::instance()) {
00201       for (unsigned int i=0;i<(unsigned int)FEDNumbering::MAXFEDID+1;i++)
00202         if (FEDNumbering::inRange(i)) validFedIds_.push_back(i);
00203     }
00204     else{
00205       for (unsigned int i=0;i<(unsigned int)FEDNumbering::MAXFEDID+1;i++)
00206         if (FEDNumbering::inRangeNoGT(i)) validFedIds_.push_back(i);
00207     }
00208     if (!isBuilding_) startBuildingWorkLoop();
00209     if (!isSending_)  startSendingWorkLoop();
00210     LOG4CPLUS_INFO(log_,"Finished enabling!");
00211     fsm_.fireEvent("EnableDone",this);
00212   }
00213   catch (xcept::Exception &e) {
00214     string msg = "enabling FAILED: " + (string)e.what();
00215     fsm_.fireFailed(msg,this);
00216   }
00217   
00218   return false;
00219 }
00220 
00221 
00222 //______________________________________________________________________________
00223 bool BU::stopping(toolbox::task::WorkLoop* wl)
00224 {
00225   try {
00226     LOG4CPLUS_INFO(log_,"Start stopping :) ...");
00227 
00228     if (0!=PlaybackRawDataProvider::instance()&&
00229         (!replay_.value_||nbEventsBuilt_<(uint32_t)events_.size())) { 
00230       lock();
00231       freeIds_.push(events_.size()); 
00232       unlock();
00233       postBuild();
00234       while (!builtIds_.empty()) {
00235         LOG4CPLUS_INFO(log_,"wait to flush ... #builtIds="<<builtIds_.size());
00236 	::sleep(1);
00237       }
00238       // let the playback go to the last event and exit
00239       PlaybackRawDataProvider::instance()->setFreeToEof(); 
00240       while (!PlaybackRawDataProvider::instance()->areFilesClosed()) usleep(1000000);
00241       usleep(100000);
00242     }
00243     
00244     lock();
00245     builtIds_.push(events_.size());
00246     unlock();
00247 
00248     postSend();
00249     while (!sentIds_.empty()) {
00250       LOG4CPLUS_INFO(log_,"wait to flush ...");
00251       ::sleep(1);
00252     }
00253     reset();
00254     /* this is not needed and should not run if reset is called
00255     if (0!=PlaybackRawDataProvider::instance()&&
00256         (replay_.value_&&nbEventsBuilt_>=(uint32_t)events_.size())) {
00257       lock();
00258       freeIds_.push(events_.size());
00259       unlock();
00260       postBuild();
00261     }
00262     */
00263     LOG4CPLUS_INFO(log_,"Finished stopping!");
00264     fsm_.fireEvent("StopDone",this);
00265   }
00266   catch (xcept::Exception &e) {
00267     string msg = "stopping FAILED: " + (string)e.what();
00268     fsm_.fireFailed(msg,this);
00269   }
00270   return false;
00271 }
00272 
00273 
00274 //______________________________________________________________________________
00275 bool BU::halting(toolbox::task::WorkLoop* wl)
00276 {
00277   try {
00278     LOG4CPLUS_INFO(log_,"Start halting ...");
00279     isHalting_=true;
00280     if (isBuilding_&&isSending_) {
00281       lock();
00282       freeIds_.push(events_.size());
00283       builtIds_.push(events_.size());
00284       unlock();
00285       postBuild();
00286       postSend();
00287     }
00288     if (0!=PlaybackRawDataProvider::instance()&&
00289         (!replay_.value_||nbEventsBuilt_<(uint32_t)events_.size())) { 
00290       PlaybackRawDataProvider::instance()->setFreeToEof();
00291       while (!PlaybackRawDataProvider::instance()->areFilesClosed()) usleep(1000000);
00292       usleep(100000);
00293     }
00294     LOG4CPLUS_INFO(log_,"Finished halting!");
00295     fsm_.fireEvent("HaltDone",this);
00296   }
00297   catch (xcept::Exception &e) {
00298     string msg = "halting FAILED: " + (string)e.what();
00299     fsm_.fireFailed(msg,this);
00300   }
00301   return false;
00302 }
00303 
00304 
00305 //______________________________________________________________________________
00306 xoap::MessageReference BU::fsmCallback(xoap::MessageReference msg)
00307   throw (xoap::exception::Exception)
00308 {
00309   return fsm_.commandCallback(msg);
00310 }
00311 
00312 
00313 //______________________________________________________________________________
00314 void BU::I2O_BU_ALLOCATE_Callback(toolbox::mem::Reference *bufRef)
00315 {
00316   if (isHalting_) {
00317     LOG4CPLUS_WARN(log_,"Ignore BU_ALLOCATE message while halting.");
00318     bufRef->release();
00319     return;
00320   }
00321   
00322   I2O_MESSAGE_FRAME             *stdMsg;
00323   I2O_BU_ALLOCATE_MESSAGE_FRAME *msg;
00324   
00325   stdMsg=(I2O_MESSAGE_FRAME*)bufRef->getDataLocation();
00326   msg   =(I2O_BU_ALLOCATE_MESSAGE_FRAME*)stdMsg;
00327   
00328   if (0==fuAppDesc_) {
00329     I2O_TID fuTid=stdMsg->InitiatorAddress;
00330     fuAppDesc_=i2o::utils::getAddressMap()->getApplicationDescriptor(fuTid);
00331   }
00332   
00333   for (unsigned int i=0;i<msg->n;i++) {
00334     unsigned int fuResourceId=msg->allocate[i].fuTransactionId;
00335     lock();
00336     rqstIds_.push(fuResourceId);
00337     postRqst();
00338     nbEventsRequested_++;
00339     nbEventsInBU_++;
00340     unlock();
00341   }
00342 
00343   bufRef->release();
00344 }
00345 
00346 
00347 //______________________________________________________________________________
00348 void BU::I2O_BU_DISCARD_Callback(toolbox::mem::Reference *bufRef)
00349 {
00350   if (isHalting_) {
00351     LOG4CPLUS_WARN(log_,"Ignore BU_DISCARD message while halting.");
00352     bufRef->release();
00353     return;
00354   }
00355 
00356   I2O_MESSAGE_FRAME           *stdMsg=(I2O_MESSAGE_FRAME*)bufRef->getDataLocation();
00357   I2O_BU_DISCARD_MESSAGE_FRAME*msg   =(I2O_BU_DISCARD_MESSAGE_FRAME*)stdMsg;
00358   unsigned int buResourceId=msg->buResourceId[0];
00359 
00360   lock();
00361   int result=sentIds_.erase(buResourceId);
00362   unlock();
00363   
00364   if (!result) {
00365     LOG4CPLUS_ERROR(log_,"can't discard unknown buResourceId '"<<buResourceId<<"'");
00366   }
00367   else {
00368     lock();
00369     freeIds_.push(buResourceId);
00370     nbEventsDiscarded_.value_++;
00371     unlock();
00372     postBuild();
00373   }
00374   
00375   bufRef->release();
00376 }
00377 
00378 
00379 //______________________________________________________________________________
00380 void BU::actionPerformed(xdata::Event& e)
00381 {
00382   gui_->monInfoSpace()->lock();
00383   if (e.type()=="urn:xdata-event:ItemGroupRetrieveEvent") {
00384     mode_=(0==PlaybackRawDataProvider::instance())?"RANDOM":"PLAYBACK";
00385     if (0!=i2oPool_) memUsedInMB_=i2oPool_->getMemoryUsage().getUsed()*9.53674e-07;
00386     else             memUsedInMB_=0.0;
00387   }
00388   else if (e.type()=="ItemChangedEvent") {
00389     string item=dynamic_cast<xdata::ItemChangedEvent&>(e).itemName();
00390     if (item=="crc") BUEvent::setComputeCrc(crc_.value_);
00391   }
00392   gui_->monInfoSpace()->unlock();
00393 }
00394 
00395 
00396 //______________________________________________________________________________
00397 void BU::webPageRequest(xgi::Input *in,xgi::Output *out)
00398   throw (xgi::exception::Exception)
00399 {
00400   string name=in->getenv("PATH_INFO");
00401   if (name.empty()) name="defaultWebPage";
00402   static_cast<xgi::MethodSignature*>(gui_->getMethod(name))->invoke(in,out);
00403 }
00404 
00405 
00406 //______________________________________________________________________________
00407 void BU::customWebPage(xgi::Input*in,xgi::Output*out)
00408   throw (xgi::exception::Exception)
00409 {
00410   *out<<"<html></html>"<<endl;
00411 }
00412 
00413 
00414 //______________________________________________________________________________
00415 void BU::startBuildingWorkLoop() throw (evf::Exception)
00416 {
00417   try {
00418     LOG4CPLUS_INFO(log_,"Start 'building' workloop");
00419     wlBuilding_=
00420       toolbox::task::getWorkLoopFactory()->getWorkLoop(sourceId_+
00421                                                        "Building",
00422                                                        "waiting");
00423     if (!wlBuilding_->isActive()) wlBuilding_->activate();
00424     asBuilding_=toolbox::task::bind(this,&BU::building,sourceId_+"Building");
00425     wlBuilding_->submit(asBuilding_);
00426     isBuilding_=true;
00427   }
00428   catch (xcept::Exception& e) {
00429     string msg = "Failed to start workloop 'building'.";
00430     XCEPT_RETHROW(evf::Exception,msg,e);
00431   }
00432 }
00433 
00434 
00435 //______________________________________________________________________________
00436 bool BU::building(toolbox::task::WorkLoop* wl)
00437 {
00438   waitBuild();
00439   lock();
00440   unsigned int buResourceId=freeIds_.front(); freeIds_.pop();
00441   unlock();
00442   
00443   if (buResourceId>=(uint32_t)events_.size()) {
00444     LOG4CPLUS_INFO(log_,"shutdown 'building' workloop.");
00445     isBuilding_=false;
00446     return false;
00447   }
00448   
00449   if (!isHalting_) {
00450     BUEvent* evt=events_[buResourceId];
00451     if(generateEvent(evt)) {
00452       lock();
00453       nbEventsBuilt_++;
00454       builtIds_.push(buResourceId);
00455       unlock();
00456       
00457       postSend();
00458     }
00459     else {
00460       LOG4CPLUS_INFO(log_,"building:received null post");
00461       lock();
00462       unsigned int saveBUResourceId = buResourceId;
00463       //buResourceId = freeIds_.front(); freeIds_.pop();
00464       freeIds_.push(saveBUResourceId);
00465       unlock();
00466       isBuilding_=false;
00467       return false;
00468     }
00469   }
00470   return true;
00471 }
00472 
00473 
00474 //______________________________________________________________________________
00475 void BU::startSendingWorkLoop() throw (evf::Exception)
00476 {
00477   try {
00478     LOG4CPLUS_INFO(log_,"Start 'sending' workloop");
00479     wlSending_=toolbox::task::getWorkLoopFactory()->getWorkLoop(sourceId_+
00480                                                                 "Sending",
00481                                                                 "waiting");
00482     if (!wlSending_->isActive()) wlSending_->activate();
00483 
00484     asSending_=toolbox::task::bind(this,&BU::sending,sourceId_+"Sending");
00485     wlSending_->submit(asSending_);
00486     isSending_=true;
00487   }
00488   catch (xcept::Exception& e) {
00489     string msg = "Failed to start workloop 'sending'.";
00490     XCEPT_RETHROW(evf::Exception,msg,e);
00491   }
00492 }
00493 
00494 
00495 //______________________________________________________________________________
00496 bool BU::sending(toolbox::task::WorkLoop* wl)
00497 {
00498   waitSend();
00499   lock();
00500   unsigned int buResourceId=builtIds_.front(); builtIds_.pop();
00501   unlock();
00502   
00503   if (buResourceId>=(uint32_t)events_.size()) {
00504     LOG4CPLUS_INFO(log_,"shutdown 'sending' workloop.");
00505     isSending_=false;
00506     return false;
00507   }
00508 
00509   if (!isHalting_) {
00510     waitRqst();
00511     lock();
00512     unsigned int fuResourceId=rqstIds_.front(); rqstIds_.pop();
00513     unlock();
00514     
00515     BUEvent* evt=events_[buResourceId];
00516     toolbox::mem::Reference* msg=createMsgChain(evt,fuResourceId);
00517     
00518     lock();
00519     sumOfSquares_+=(uint64_t)evt->evtSize()*(uint64_t)evt->evtSize();
00520     sumOfSizes_  +=evt->evtSize();
00521     nbEventsInBU_--;
00522     nbEventsSent_++;
00523     sentIds_.insert(buResourceId);
00524     unlock();
00525     
00526     buAppContext_->postFrame(msg,buAppDesc_,fuAppDesc_);  
00527   }
00528   
00529   return true;
00530 }
00531 
00532 
00533 //______________________________________________________________________________
00534 void BU::startMonitoringWorkLoop() throw (evf::Exception)
00535 {
00536   struct timezone timezone;
00537   gettimeofday(&monStartTime_,&timezone);
00538   
00539   try {
00540     LOG4CPLUS_INFO(log_,"Start 'monitoring' workloop");
00541     wlMonitoring_=
00542       toolbox::task::getWorkLoopFactory()->getWorkLoop(sourceId_+
00543                                                        "Monitoring",
00544                                                        "waiting");
00545     if (!wlMonitoring_->isActive()) wlMonitoring_->activate();
00546     asMonitoring_=toolbox::task::bind(this,&BU::monitoring,sourceId_+"Monitoring");
00547     wlMonitoring_->submit(asMonitoring_);
00548   }
00549   catch (xcept::Exception& e) {
00550     string msg = "Failed to start workloop 'monitoring'.";
00551     XCEPT_RETHROW(evf::Exception,msg,e);
00552   }
00553 }
00554 
00555 
00556 //______________________________________________________________________________
00557 bool BU::monitoring(toolbox::task::WorkLoop* wl)
00558 {
00559   struct timeval  monEndTime;
00560   struct timezone timezone;
00561   
00562   gettimeofday(&monEndTime,&timezone);
00563   
00564   lock();
00565   unsigned int monN           =nbEventsBuilt_.value_;
00566   uint64_t     monSumOfSquares=sumOfSquares_;
00567   unsigned int monSumOfSizes  =sumOfSizes_;
00568   uint64_t     deltaSumOfSquares;
00569   unlock();
00570   
00571   gui_->monInfoSpace()->lock();
00572   
00573   deltaT_.value_=deltaT(&monStartTime_,&monEndTime);
00574   monStartTime_=monEndTime;
00575   
00576   deltaN_.value_=monN-monLastN_;
00577   monLastN_=monN;
00578 
00579   deltaSumOfSquares=monSumOfSquares-monLastSumOfSquares_;
00580   deltaSumOfSquares_.value_=(double)deltaSumOfSquares;
00581   monLastSumOfSquares_=monSumOfSquares;
00582   
00583   deltaSumOfSizes_.value_=monSumOfSizes-monLastSumOfSizes_;
00584   monLastSumOfSizes_=monSumOfSizes;
00585   
00586   if (deltaT_.value_!=0) {
00587     throughput_=deltaSumOfSizes_.value_/deltaT_.value_;
00588     rate_      =deltaN_.value_/deltaT_.value_;
00589   }
00590   else {
00591     throughput_=0.0;
00592     rate_      =0.0;
00593   }
00594   
00595   double meanOfSquares,mean,squareOfMean,variance;
00596   
00597   if(deltaN_.value_!=0) {
00598     meanOfSquares=deltaSumOfSquares_.value_/((double)(deltaN_.value_));
00599     mean=((double)(deltaSumOfSizes_.value_))/((double)(deltaN_.value_));
00600     squareOfMean=mean*mean;
00601     variance=meanOfSquares-squareOfMean;
00602     average_=deltaSumOfSizes_.value_/deltaN_.value_;
00603     rms_    =std::sqrt(variance);
00604   }
00605   else {
00606     average_=0.0;
00607     rms_    =0.0;
00608   }
00609 
00610   gui_->monInfoSpace()->unlock();
00611   
00612   ::sleep(monSleepSec_.value_);
00613 
00614   return true;
00615 }
00616     
00617 
00618 
00620 // implementation of private member functions
00622 
00623 //______________________________________________________________________________
00624 void BU::exportParameters()
00625 {
00626   if (0==gui_) {
00627     LOG4CPLUS_ERROR(log_,"No GUI, can't export parameters");
00628     return;
00629   }
00630   
00631   gui_->addMonitorParam("url",                &url_);
00632   gui_->addMonitorParam("class",              &class_);
00633   gui_->addMonitorParam("instance",           &instance_);
00634   gui_->addMonitorParam("hostname",           &hostname_);
00635   gui_->addMonitorParam("runNumber",          &runNumber_);
00636   gui_->addMonitorParam("stateName",          fsm_.stateName());
00637   gui_->addMonitorParam("memUsedInMB",        &memUsedInMB_);
00638   gui_->addMonitorParam("deltaT",             &deltaT_);
00639   gui_->addMonitorParam("deltaN",             &deltaN_);
00640   gui_->addMonitorParam("deltaSumOfSquares",  &deltaSumOfSquares_);
00641   gui_->addMonitorParam("deltaSumOfSizes",    &deltaSumOfSizes_);
00642   gui_->addMonitorParam("throughput",         &throughput_);
00643   gui_->addMonitorParam("average",            &average_);
00644   gui_->addMonitorParam("rate",               &rate_);
00645   gui_->addMonitorParam("rms",                &rms_);
00646 
00647   gui_->addMonitorCounter("nbEvtsInBU",       &nbEventsInBU_);
00648   gui_->addMonitorCounter("nbEvtsRequested",  &nbEventsRequested_);
00649   gui_->addMonitorCounter("nbEvtsBuilt",      &nbEventsBuilt_);
00650   gui_->addMonitorCounter("nbEvtsSent",       &nbEventsSent_);
00651   gui_->addMonitorCounter("nbEvtsDiscarded",  &nbEventsDiscarded_);
00652 
00653   gui_->addStandardParam("mode",              &mode_);
00654   gui_->addStandardParam("replay",            &replay_);
00655   gui_->addStandardParam("overwriteEvtId",    &overwriteEvtId_);
00656   gui_->addStandardParam("crc",               &crc_);
00657   gui_->addStandardParam("firstEvent",        &firstEvent_);
00658   gui_->addStandardParam("queueSize",         &queueSize_);
00659   gui_->addStandardParam("eventBufferSize",   &eventBufferSize_);
00660   gui_->addStandardParam("msgBufferSize",     &msgBufferSize_);
00661   gui_->addStandardParam("fedSizeMax",        &fedSizeMax_);
00662   gui_->addStandardParam("fedSizeMean",       &fedSizeMean_);
00663   gui_->addStandardParam("fedSizeWidth",      &fedSizeWidth_);
00664   gui_->addStandardParam("useFixedFedSize",   &useFixedFedSize_);
00665   gui_->addStandardParam("monSleepSec",       &monSleepSec_);
00666   gui_->addStandardParam("rcmsStateListener",     fsm_.rcmsStateListener());
00667   gui_->addStandardParam("foundRcmsStateListener",fsm_.foundRcmsStateListener());
00668 
00669   
00670   gui_->exportParameters();
00671 
00672   gui_->addItemChangedListener("crc",this);
00673   
00674 }
00675 
00676 
00677 //______________________________________________________________________________
00678 void BU::reset()
00679 {
00680   gui_->resetCounters();
00681   
00682   deltaT_             =0.0;
00683   deltaN_             =  0;
00684   deltaSumOfSquares_  =  0;
00685   deltaSumOfSizes_    =  0;
00686   
00687   throughput_         =0.0;
00688   average_            =  0;
00689   rate_               =  0;
00690   rms_                =  0;
00691 
00692   monLastN_           =  0;
00693   monLastSumOfSquares_=  0;
00694   monLastSumOfSizes_  =  0;
00695   
00696   while (events_.size()) {
00697     delete events_.back();
00698     events_.pop_back();
00699   }
00700   
00701   while (!rqstIds_.empty())  rqstIds_.pop();
00702   while (!freeIds_.empty())  freeIds_.pop();
00703   while (!builtIds_.empty()) builtIds_.pop();
00704   sentIds_.clear();
00705  
00706   sem_init(&lock_,0,1);
00707   sem_init(&buildSem_,0,queueSize_);
00708   sem_init(&sendSem_,0,0);
00709   sem_init(&rqstSem_,0,0);
00710   
00711   for (unsigned int i=0;i<queueSize_;i++) {
00712     events_.push_back(new BUEvent(i,eventBufferSize_));
00713     freeIds_.push(i);
00714   }
00715   validFedIds_.clear();
00716 }
00717 
00718 //______________________________________________________________________________
00719 double BU::deltaT(const struct timeval *start,const struct timeval *end)
00720 {
00721   unsigned int  sec;
00722   unsigned int  usec;
00723   
00724   sec = end->tv_sec - start->tv_sec;
00725   
00726   if(end->tv_usec > start->tv_usec) {
00727     usec = end->tv_usec - start->tv_usec;
00728   }
00729   else {
00730     sec--;
00731     usec = 1000000 - ((unsigned int )(start->tv_usec - end->tv_usec));
00732   }
00733   
00734   return ((double)sec) + ((double)usec) / 1000000.0;
00735 }
00736 
00737 
00738 //______________________________________________________________________________
00739 bool BU::generateEvent(BUEvent* evt)
00740 {
00741   // replay?
00742   if (replay_.value_&&nbEventsBuilt_>=(uint32_t)events_.size()) 
00743     {
00744       if (0!=PlaybackRawDataProvider::instance())
00745         PlaybackRawDataProvider::instance()->setFreeToEof();
00746       return true;
00747     }  
00748   // PLAYBACK mode
00749   if (0!=PlaybackRawDataProvider::instance()) {
00750     
00751     unsigned int runNumber,evtNumber;
00752 
00753     FEDRawDataCollection* event=
00754       PlaybackRawDataProvider::instance()->getFEDRawData(runNumber,evtNumber);
00755     if(event == 0) return false;
00756     evt->initialize(evtNumber);
00757     
00758     for (unsigned int i=0;i<validFedIds_.size();i++) {
00759       unsigned int   fedId  =validFedIds_[i];
00760       unsigned int   fedSize=event->FEDData(fedId).size();
00761       unsigned char* fedAddr=event->FEDData(fedId).data();
00762       if (overwriteEvtId_.value_ && fedAddr != 0) {
00763         fedh_t *fedHeader=(fedh_t*)fedAddr;
00764         fedHeader->eventid=(fedHeader->eventid&0xFF000000)+(evtNumber&0x00FFFFFF);
00765       }
00766       if (fedSize>0) evt->writeFed(fedId,fedAddr,fedSize);
00767     }
00768     delete event;
00769   }
00770   // RANDOM mode
00771   else {
00772     unsigned int evtNumber=(firstEvent_+evtNumber_++)%0x1000000;
00773     evt->initialize(evtNumber);
00774     unsigned int fedSizeMin=fedHeaderSize_+fedTrailerSize_;
00775     for (unsigned int i=0;i<validFedIds_.size();i++) {
00776       unsigned int fedId(validFedIds_[i]);
00777       unsigned int fedSize(fedSizeMean_);
00778       if (!useFixedFedSize_) {
00779         double logFedSize=CLHEP::RandGauss::shoot(gaussianMean_,gaussianWidth_);
00780         fedSize=(unsigned int)(std::exp(logFedSize));
00781         if (fedSize<fedSizeMin)  fedSize=fedSizeMin;
00782         if (fedSize>fedSizeMax_) fedSize=fedSizeMax_;
00783         fedSize-=fedSize%8;
00784       }
00785       
00786       evt->writeFed(fedId,0,fedSize);
00787       evt->writeFedHeader(i);
00788       evt->writeFedTrailer(i);
00789     }
00790     
00791   }
00792   return true;
00793 }
00794 
00795 
00796 //______________________________________________________________________________
00797 toolbox::mem::Reference *BU::createMsgChain(BUEvent* evt,
00798                                             unsigned int fuResourceId)
00799 {
00800   unsigned int msgHeaderSize =sizeof(I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME);
00801   unsigned int msgPayloadSize=msgBufferSize_-msgHeaderSize;
00802 
00803   if((msgPayloadSize%4)!=0) LOG4CPLUS_ERROR(log_,"Invalid Payload Size.");
00804   
00805   toolbox::mem::Reference *head  =0;
00806   toolbox::mem::Reference *tail  =0;
00807   toolbox::mem::Reference *bufRef=0;
00808   
00809   I2O_MESSAGE_FRAME                  *stdMsg=0;
00810   I2O_PRIVATE_MESSAGE_FRAME          *pvtMsg=0;
00811   I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME *block =0;
00812   
00813   unsigned int iFed            =0;
00814   unsigned int nSuperFrag      =64;
00815   unsigned int nFedPerSuperFrag=validFedIds_.size()/nSuperFrag;
00816   unsigned int nBigSuperFrags  =validFedIds_.size()%nSuperFrag;
00817   
00818   if (evt->nFed()<nSuperFrag) {
00819     nSuperFrag=evt->nFed();
00820     nFedPerSuperFrag=1;
00821     nBigSuperFrags=0;
00822   }
00823   else
00824     {
00825       nFedPerSuperFrag=evt->nFed()/nSuperFrag;
00826       nBigSuperFrags  =evt->nFed()%nSuperFrag;
00827     }
00828   // loop over all super fragments
00829   for (unsigned int iSuperFrag=0;iSuperFrag<nSuperFrag;iSuperFrag++) {
00830     
00831     // compute index of last fed in this super fragment
00832     unsigned int nFed=iFed+nFedPerSuperFrag;
00833     if (iSuperFrag<nBigSuperFrags) ++nFed;
00834     
00835     // compute number of blocks in this super fragment
00836     unsigned int nBlock  =0;
00837     unsigned int curbSize=frlHeaderSize_;
00838     unsigned int totSize =curbSize;
00839     for (unsigned int i=iFed;i<nFed;i++) {
00840       curbSize+=evt->fedSize(i);
00841       totSize+=evt->fedSize(i);
00842       if (curbSize>msgPayloadSize) {
00843         curbSize+=frlHeaderSize_*(curbSize/msgPayloadSize);
00844         if(curbSize%msgPayloadSize)totSize+=frlHeaderSize_*(curbSize/msgPayloadSize);
00845         else totSize+=frlHeaderSize_*((curbSize/msgPayloadSize)-1);
00846         curbSize=curbSize%msgPayloadSize;
00847       }
00848     }
00849     nBlock=totSize/msgPayloadSize+(totSize%msgPayloadSize>0 ? 1 : 0);
00850     
00851     
00852     // loop over all blocks (msgs) in the current super fragment
00853     unsigned int   remainder     =0;
00854     bool           fedTrailerLeft=false;
00855     bool           last          =false;
00856     bool           warning       =false;
00857     unsigned char *startOfPayload=0;
00858     U32            payload(0);
00859     
00860     for(unsigned int iBlock=0;iBlock<nBlock;iBlock++) {
00861       
00862       // If last block and its partial (there can be only 0 or 1 partial)
00863       payload=msgPayloadSize;
00864       
00865       // Allocate memory for a fragment block / message
00866       try {
00867         bufRef=toolbox::mem::getMemoryPoolFactory()->getFrame(i2oPool_,
00868                                                               msgBufferSize_);
00869       }
00870       catch(xcept::Exception &e) {
00871         LOG4CPLUS_FATAL(log_,"xdaq::frameAlloc failed");
00872       }
00873       
00874       // Fill in the fields of the fragment block / message
00875       stdMsg=(I2O_MESSAGE_FRAME*)bufRef->getDataLocation();
00876       pvtMsg=(I2O_PRIVATE_MESSAGE_FRAME*)stdMsg;
00877       block =(I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME*)stdMsg;
00878       
00879       pvtMsg->XFunctionCode   =I2O_FU_TAKE;
00880       pvtMsg->OrganizationID  =XDAQ_ORGANIZATION_ID;
00881       
00882       stdMsg->MessageSize     =(msgHeaderSize + payload) >> 2;
00883       stdMsg->Function        =I2O_PRIVATE_MESSAGE;
00884       stdMsg->VersionOffset   =0;
00885       stdMsg->MsgFlags        =0;
00886       stdMsg->InitiatorAddress=i2o::utils::getAddressMap()->getTid(buAppDesc_);
00887       stdMsg->TargetAddress   =i2o::utils::getAddressMap()->getTid(fuAppDesc_);
00888       
00889       block->buResourceId           =evt->buResourceId();
00890       block->fuTransactionId        =fuResourceId;
00891       block->blockNb                =iBlock;
00892       block->nbBlocksInSuperFragment=nBlock;
00893       block->superFragmentNb        =iSuperFrag;
00894       block->nbSuperFragmentsInEvent=nSuperFrag;
00895       block->eventNumber            =evt->evtNumber();
00896       
00897       // Fill in payload 
00898       startOfPayload   =(unsigned char*)block+msgHeaderSize;
00899       frlh_t* frlHeader=(frlh_t*)startOfPayload;
00900       frlHeader->trigno=evt->evtNumber();
00901       frlHeader->segno =iBlock;
00902       
00903       unsigned char *startOfFedBlocks=startOfPayload+frlHeaderSize_;
00904       payload              -=frlHeaderSize_;
00905       frlHeader->segsize    =payload;
00906       unsigned int leftspace=payload;
00907       
00908       // a fed trailer was left over from the previous block
00909       if(fedTrailerLeft) {
00910         memcpy(startOfFedBlocks,
00911                evt->fedAddr(iFed)+evt->fedSize(iFed)-fedTrailerSize_,
00912                fedTrailerSize_);
00913         
00914         startOfFedBlocks+=fedTrailerSize_;
00915         leftspace       -=fedTrailerSize_;
00916         remainder        =0;
00917         fedTrailerLeft   =false;
00918         
00919         // if this is the last fed, adjust block (msg) size and set last=true
00920         if((iFed==nFed-1) && !last) {
00921           frlHeader->segsize-=leftspace;
00922           int msgSize=stdMsg->MessageSize << 2;
00923           msgSize   -=leftspace;
00924           bufRef->setDataSize(msgSize);
00925           stdMsg->MessageSize = msgSize >> 2;
00926           frlHeader->segsize=frlHeader->segsize | FRL_LAST_SEGM;
00927           last=true;
00928         }
00929         
00930         // !! increment iFed !!
00931         iFed++;
00932       }
00933       
00937       if (remainder>0) {
00938         
00939         // the remaining fed fits entirely into the new block
00940         if(payload>=remainder) {
00941           memcpy(startOfFedBlocks,
00942                  evt->fedAddr(iFed)+evt->fedSize(iFed)-remainder,
00943                  remainder);
00944           
00945           startOfFedBlocks+=remainder;
00946           leftspace       -=remainder;
00947           
00948           // if this is the last fed in the superfragment, earmark it
00949           if(iFed==nFed-1) {
00950             frlHeader->segsize-=leftspace;
00951             int msgSize=stdMsg->MessageSize << 2;
00952             msgSize   -=leftspace;
00953             bufRef->setDataSize(msgSize);
00954             stdMsg->MessageSize = msgSize >> 2;
00955             frlHeader->segsize=frlHeader->segsize | FRL_LAST_SEGM;
00956             last=true;
00957           }
00958           
00959           // !! increment iFed !!
00960           iFed++;
00961           
00962           // start new fed -> set remainder to 0!
00963           remainder=0;
00964         }
00965         // the remaining payload fits, but not the fed trailer
00966         else if (payload>=(remainder-fedTrailerSize_)) {
00967           memcpy(startOfFedBlocks,
00968                  evt->fedAddr(iFed)+evt->fedSize(iFed)-remainder,
00969                  remainder-fedTrailerSize_);
00970           
00971           frlHeader->segsize=remainder-fedTrailerSize_;
00972           fedTrailerLeft    =true;
00973           leftspace        -=(remainder-fedTrailerSize_);
00974           remainder         =fedTrailerSize_;
00975         }
00976         // the remaining payload fits only partially, fill whole block
00977         else {
00978           memcpy(startOfFedBlocks,
00979                  evt->fedAddr(iFed)+evt->fedSize(iFed)-remainder,payload);
00980           remainder-=payload;
00981           leftspace =0;
00982         }
00983       }
00984       
00988       if(remainder==0) {
00989         
00990         // loop on feds
00991         while(iFed<nFed) {
00992           
00993           // if the next header does not fit, jump to following block
00994           if((int)leftspace<fedHeaderSize_) {
00995             frlHeader->segsize-=leftspace;
00996             break;
00997           }
00998           
00999           memcpy(startOfFedBlocks,evt->fedAddr(iFed),fedHeaderSize_);
01000           
01001           leftspace       -=fedHeaderSize_;
01002           startOfFedBlocks+=fedHeaderSize_;
01003           
01004           // fed fits with its trailer
01005           if(evt->fedSize(iFed)-fedHeaderSize_<=leftspace) {
01006             memcpy(startOfFedBlocks,
01007                    evt->fedAddr(iFed)+fedHeaderSize_,
01008                    evt->fedSize(iFed)-fedHeaderSize_);
01009             
01010             leftspace       -=(evt->fedSize(iFed)-fedHeaderSize_);
01011             startOfFedBlocks+=(evt->fedSize(iFed)-fedHeaderSize_);
01012           }
01013           // fed payload fits only without fed trailer
01014           else if(evt->fedSize(iFed)-fedHeaderSize_-fedTrailerSize_<=leftspace) {
01015             memcpy(startOfFedBlocks,
01016                    evt->fedAddr(iFed)+fedHeaderSize_,
01017                    evt->fedSize(iFed)-fedHeaderSize_-fedTrailerSize_);
01018             
01019             leftspace         -=(evt->fedSize(iFed)-fedHeaderSize_-fedTrailerSize_);
01020             frlHeader->segsize-=leftspace;
01021             fedTrailerLeft     =true;
01022             remainder          =fedTrailerSize_;
01023             
01024             break;
01025           }
01026           // fed payload fits only partially
01027           else {
01028             memcpy(startOfFedBlocks,evt->fedAddr(iFed)+fedHeaderSize_,leftspace);
01029             remainder=evt->fedSize(iFed)-fedHeaderSize_-leftspace;
01030             leftspace=0;
01031             
01032             break;
01033           }
01034           
01035           // !! increase iFed !!
01036           iFed++;
01037           
01038         } // while (iFed<fedN_)
01039         
01040         // earmark the last block
01041         if (iFed==nFed && remainder==0 && !last) {
01042           frlHeader->segsize-=leftspace;
01043           int msgSize=stdMsg->MessageSize << 2;
01044           msgSize   -=leftspace;
01045           bufRef->setDataSize(msgSize);
01046           stdMsg->MessageSize=msgSize >> 2;
01047           frlHeader->segsize =frlHeader->segsize | FRL_LAST_SEGM;
01048           last=true;
01049         }
01050         
01051       } // if (remainder==0)
01052       
01053       if(iSuperFrag==0&&iBlock==0) { // This is the first fragment block / message
01054         head=bufRef;
01055         tail=bufRef;
01056       }
01057       else {
01058         tail->setNextReference(bufRef);
01059         tail=bufRef;
01060       }
01061       
01062       if((iBlock==nBlock-1) && remainder!=0) {
01063         nBlock++;
01064         warning=true;
01065       }
01066       
01067     } // for (iBlock)
01068     
01069     // fix case where block estimate was wrong
01070     if(warning) {
01071       toolbox::mem::Reference* next=head;
01072       do {
01073         block =(I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME*)next->getDataLocation();
01074         if (block->superFragmentNb==iSuperFrag)
01075           block->nbBlocksInSuperFragment=nBlock;                
01076       } while((next=next->getNextReference()));
01077     }
01078   
01079   } // iSuperFrag < nSuperFrag
01080   
01081   return head; // return the top of the chain
01082 }
01083 
01084 //______________________________________________________________________________
01085 void BU::dumpFrame(unsigned char* data,unsigned int len)
01086 {
01087   char left1[20];
01088   char left2[20];
01089   char right1[20];
01090   char right2[20];
01091 
01092   printf("Byte  0  1  2  3  4  5  6  7\n");
01093   
01094   int c(0);
01095   int pos(0);
01096   
01097   for (unsigned int i=0;i<(len/8);i++) {
01098     int rpos(0);
01099     int off(3);
01100     for (pos=0;pos<12;pos+=3) {
01101       sprintf(&left1[pos],"%2.2x ",
01102               ((unsigned char*)data)[c+off]);
01103       sprintf(&right1[rpos],"%1c",
01104               ((data[c+off] > 32)&&(data[c+off] < 127)) ? data[c+off] : '.');
01105       sprintf (&left2[pos],"%2.2x ",
01106                ((unsigned char*)data)[c+off+4]);
01107       sprintf (&right2[rpos],"%1c",
01108                ((data[c+off+4] > 32)&&(data[c+off+4]<127)) ? data[c+off+4] : '.');
01109       rpos++;
01110       off--;
01111     }
01112     c+=8;
01113     
01114     printf ("%4d: %s%s ||  %s%s  %p\n",
01115             c-8, left1, left2, right1, right2, &data[c-8]);
01116   }
01117   
01118   fflush(stdout);       
01119 }
01120 
01121 
01123 // xdaq instantiator implementation macro
01125 
01126 XDAQ_INSTANTIATOR_IMPL(BU)