CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_4_4_5_patch3/src/EventFilter/AutoBU/src/BU.cc

Go to the documentation of this file.
00001 
00002 //
00003 // BU
00004 // --
00005 //
00006 //                                         Emilio Meschi <emilio.meschi@cern.ch>
00007 //                       Philipp Schieferdecker <philipp.schieferdecker@cern.ch>
00009 
00010 
00011 #include "EventFilter/AutoBU/interface/BU.h"
00012 
00013 #include "FWCore/Utilities/interface/CRC16.h"
00014 
00015 #include "DataFormats/FEDRawData/interface/FEDNumbering.h"
00016 #include "DataFormats/FEDRawData/interface/FEDRawDataCollection.h"
00017 
00018 #include "xoap/SOAPEnvelope.h"
00019 #include "xoap/SOAPBody.h"
00020 #include "xoap/domutils.h"
00021 
00022 #include <netinet/in.h>
00023 #include <sstream>
00024 
00025 
00026 using namespace std;
00027 using namespace evf;
00028 
00029 
00031 // construction/destruction
00033 
00034 //______________________________________________________________________________
00035 BU::BU(xdaq::ApplicationStub *s) 
00036   : xdaq::Application(s)
00037   , log_(getApplicationLogger())
00038   , buAppDesc_(getApplicationDescriptor())
00039   , fuAppDesc_(0)
00040   , buAppContext_(getApplicationContext())
00041   , fsm_(this)
00042   , gui_(0)
00043   , evtNumber_(0)
00044   , isBuilding_(false)
00045   , isSending_(false)
00046   , isHalting_(false)
00047   , wlBuilding_(0)
00048   , asBuilding_(0)
00049   , wlSending_(0)
00050   , asSending_(0)
00051   , wlMonitoring_(0)
00052   , asMonitoring_(0)
00053   , instance_(0)
00054   , runNumber_(0)
00055   , memUsedInMB_(0.0)
00056   , deltaT_(0.0)
00057   , deltaN_(0)
00058   , deltaSumOfSquares_(0)
00059   , deltaSumOfSizes_(0)
00060   , throughput_(0.0)
00061   , average_(0.0)
00062   , rate_(0.0)
00063   , rms_(0.0)
00064   , nbEventsInBU_(0)
00065   , nbEventsRequested_(0)
00066   , nbEventsBuilt_(0)
00067   , nbEventsSent_(0)
00068   , nbEventsDiscarded_(0)
00069   , mode_("RANDOM")
00070   , replay_(false)
00071   , crc_(true)
00072   , overwriteEvtId_(true)
00073   , firstEvent_(1)
00074   , queueSize_(32)
00075   , eventBufferSize_(0x400000)
00076   , msgBufferSize_(32768)
00077   , fedSizeMax_(65536)
00078   , fedSizeMean_(1024)
00079   , fedSizeWidth_(1024)
00080   , useFixedFedSize_(false)
00081   , monSleepSec_(1)
00082   , gaussianMean_(0.0)
00083   , gaussianWidth_(1.0)
00084   , monLastN_(0)
00085   , monLastSumOfSquares_(0)
00086   , monLastSumOfSizes_(0)
00087   , sumOfSquares_(0)
00088   , sumOfSizes_(0)
00089   , i2oPool_(0)
00090 {
00091   // initialize state machine
00092   fsm_.initialize<evf::BU>(this);
00093   
00094   // initialize application info
00095   url_     =
00096     getApplicationDescriptor()->getContextDescriptor()->getURL()+"/"+
00097     getApplicationDescriptor()->getURN();
00098   class_   =getApplicationDescriptor()->getClassName();
00099   instance_=getApplicationDescriptor()->getInstance();
00100   hostname_=getApplicationDescriptor()->getContextDescriptor()->getURL();
00101   sourceId_=class_.toString()+instance_.toString();
00102   
00103   // i2o callbacks
00104   i2o::bind(this,&BU::I2O_BU_ALLOCATE_Callback,I2O_BU_ALLOCATE,XDAQ_ORGANIZATION_ID);
00105   i2o::bind(this,&BU::I2O_BU_DISCARD_Callback, I2O_BU_DISCARD, XDAQ_ORGANIZATION_ID);
00106   
00107   // allocate i2o memery pool
00108   string i2oPoolName=sourceId_+"_i2oPool";
00109   try {
00110     toolbox::mem::HeapAllocator *allocator=new toolbox::mem::HeapAllocator();
00111     toolbox::net::URN urn("toolbox-mem-pool",i2oPoolName);
00112     toolbox::mem::MemoryPoolFactory* poolFactory=
00113       toolbox::mem::getMemoryPoolFactory();
00114     i2oPool_=poolFactory->createPool(urn,allocator);
00115   }
00116   catch (toolbox::mem::exception::Exception& e) {
00117     string s="Failed to create pool: "+i2oPoolName;
00118     LOG4CPLUS_FATAL(log_,s);
00119     XCEPT_RETHROW(xcept::Exception,s,e);
00120   }
00121   
00122   // web interface
00123   xgi::bind(this,&evf::BU::webPageRequest,"Default");
00124   gui_=new WebGUI(this,&fsm_);
00125   gui_->setSmallAppIcon("/rubuilder/bu/images/bu32x32.gif");
00126   gui_->setLargeAppIcon("/rubuilder/bu/images/bu64x64.gif");
00127   
00128   vector<toolbox::lang::Method*> methods=gui_->getMethods();
00129   vector<toolbox::lang::Method*>::iterator it;
00130   for (it=methods.begin();it!=methods.end();++it) {
00131     if ((*it)->type()=="cgi") {
00132       string name=static_cast<xgi::MethodSignature*>(*it)->name();
00133       xgi::bind(this,&evf::BU::webPageRequest,name);
00134     }
00135   }
00136   xgi::bind(this,&evf::BU::customWebPage,"customWebPage");
00137   
00138   
00139   // export parameters to info space(s)
00140   exportParameters();
00141 
00142   // findRcmsStateListener
00143   fsm_.findRcmsStateListener();
00144   
00145   // compute parameters for fed size generation (a la Emilio)
00146   gaussianMean_ =std::log((double)fedSizeMean_);
00147   gaussianWidth_=std::sqrt(std::log
00148                            (0.5*
00149                             (1+std::sqrt
00150                              (1.0+4.0*
00151                               fedSizeWidth_.value_*fedSizeWidth_.value_/
00152                               fedSizeMean_.value_/fedSizeMean_.value_))));
00153 
00154   // start monitoring thread, once and for all
00155   startMonitoringWorkLoop();
00156   
00157   // propagate crc flag to BUEvent
00158   BUEvent::setComputeCrc(crc_.value_);
00159 }
00160 
00161 
00162 //______________________________________________________________________________
00163 BU::~BU()
00164 {
00165   while (!events_.empty()) { delete events_.back(); events_.pop_back(); }
00166 }
00167 
00168 
00170 // implementation of member functions
00172 
00173 //______________________________________________________________________________
00174 bool BU::configuring(toolbox::task::WorkLoop* wl)
00175 {
00176   isHalting_=false;
00177   try {
00178     LOG4CPLUS_INFO(log_,"Start configuring ...");
00179     reset();
00180     LOG4CPLUS_INFO(log_,"Finished configuring!");
00181     fsm_.fireEvent("ConfigureDone",this);
00182   }
00183   catch (xcept::Exception &e) {
00184     string msg = "configuring FAILED: " + (string)e.what();
00185     fsm_.fireFailed(msg,this);
00186   }
00187 
00188   return false;
00189 }
00190 
00191 
00192 //______________________________________________________________________________
00193 bool BU::enabling(toolbox::task::WorkLoop* wl)
00194 {
00195   isHalting_=false;
00196   try {
00197     LOG4CPLUS_INFO(log_,"Start enabling ...");
00198     // determine valid fed ids (assumes Playback EP is already configured hence PBRDP::instance 
00199     // not null in case we are playing back)
00200     if (0!=PlaybackRawDataProvider::instance()) {
00201       for (unsigned int i=0;i<(unsigned int)FEDNumbering::MAXFEDID+1;i++)
00202         if (FEDNumbering::inRange(i)) validFedIds_.push_back(i);
00203     }
00204     else{
00205       for (unsigned int i=0;i<(unsigned int)FEDNumbering::MAXFEDID+1;i++)
00206         if (FEDNumbering::inRangeNoGT(i)) validFedIds_.push_back(i);
00207     }
00208     if (!isBuilding_) startBuildingWorkLoop();
00209     if (!isSending_)  startSendingWorkLoop();
00210     LOG4CPLUS_INFO(log_,"Finished enabling!");
00211     fsm_.fireEvent("EnableDone",this);
00212   }
00213   catch (xcept::Exception &e) {
00214     string msg = "enabling FAILED: " + (string)e.what();
00215     fsm_.fireFailed(msg,this);
00216   }
00217   
00218   return false;
00219 }
00220 
00221 
00222 //______________________________________________________________________________
00223 bool BU::stopping(toolbox::task::WorkLoop* wl)
00224 {
00225   try {
00226     LOG4CPLUS_INFO(log_,"Start stopping :) ...");
00227 
00228     if (0!=PlaybackRawDataProvider::instance()&&
00229         (!replay_.value_||nbEventsBuilt_<(uint32_t)events_.size())) { 
00230       lock();
00231       freeIds_.push(events_.size()); 
00232       unlock();
00233       postBuild();
00234       while (!builtIds_.empty()) {
00235         LOG4CPLUS_INFO(log_,"wait to flush ... #builtIds="<<builtIds_.size());
00236 	::sleep(1);
00237       }
00238       // let the playback go to the last event and exit
00239       PlaybackRawDataProvider::instance()->setFreeToEof(); 
00240     }
00241     
00242     lock();
00243     builtIds_.push(events_.size());
00244     unlock();
00245 
00246     postSend();
00247     while (!sentIds_.empty()) {
00248       LOG4CPLUS_INFO(log_,"wait to flush ...");
00249       ::sleep(1);
00250     }
00251     if (0!=PlaybackRawDataProvider::instance()&&
00252         (replay_.value_&&nbEventsBuilt_>=(uint32_t)events_.size())) {
00253       lock();
00254       freeIds_.push(events_.size());
00255       unlock();
00256       postBuild();
00257     }
00258     LOG4CPLUS_INFO(log_,"Finished stopping!");
00259     fsm_.fireEvent("StopDone",this);
00260   }
00261   catch (xcept::Exception &e) {
00262     string msg = "stopping FAILED: " + (string)e.what();
00263     fsm_.fireFailed(msg,this);
00264   }
00265   return false;
00266 }
00267 
00268 
00269 //______________________________________________________________________________
00270 bool BU::halting(toolbox::task::WorkLoop* wl)
00271 {
00272   try {
00273     LOG4CPLUS_INFO(log_,"Start halting ...");
00274     isHalting_=true;
00275     if (isBuilding_&&isSending_) {
00276       lock();
00277       freeIds_.push(events_.size());
00278       builtIds_.push(events_.size());
00279       unlock();
00280       postBuild();
00281       postSend();
00282     }
00283     LOG4CPLUS_INFO(log_,"Finished halting!");
00284     fsm_.fireEvent("HaltDone",this);
00285   }
00286   catch (xcept::Exception &e) {
00287     string msg = "halting FAILED: " + (string)e.what();
00288     fsm_.fireFailed(msg,this);
00289   }
00290   
00291   return false;
00292 }
00293 
00294 
00295 //______________________________________________________________________________
00296 xoap::MessageReference BU::fsmCallback(xoap::MessageReference msg)
00297   throw (xoap::exception::Exception)
00298 {
00299   return fsm_.commandCallback(msg);
00300 }
00301 
00302 
00303 //______________________________________________________________________________
00304 void BU::I2O_BU_ALLOCATE_Callback(toolbox::mem::Reference *bufRef)
00305 {
00306   if (isHalting_) {
00307     LOG4CPLUS_WARN(log_,"Ignore BU_ALLOCATE message while halting.");
00308     bufRef->release();
00309     return;
00310   }
00311   
00312   I2O_MESSAGE_FRAME             *stdMsg;
00313   I2O_BU_ALLOCATE_MESSAGE_FRAME *msg;
00314   
00315   stdMsg=(I2O_MESSAGE_FRAME*)bufRef->getDataLocation();
00316   msg   =(I2O_BU_ALLOCATE_MESSAGE_FRAME*)stdMsg;
00317   
00318   if (0==fuAppDesc_) {
00319     I2O_TID fuTid=stdMsg->InitiatorAddress;
00320     fuAppDesc_=i2o::utils::getAddressMap()->getApplicationDescriptor(fuTid);
00321   }
00322   
00323   for (unsigned int i=0;i<msg->n;i++) {
00324     unsigned int fuResourceId=msg->allocate[i].fuTransactionId;
00325     lock();
00326     rqstIds_.push(fuResourceId);
00327     postRqst();
00328     nbEventsRequested_++;
00329     nbEventsInBU_++;
00330     unlock();
00331   }
00332 
00333   bufRef->release();
00334 }
00335 
00336 
00337 //______________________________________________________________________________
00338 void BU::I2O_BU_DISCARD_Callback(toolbox::mem::Reference *bufRef)
00339 {
00340   if (isHalting_) {
00341     LOG4CPLUS_WARN(log_,"Ignore BU_DISCARD message while halting.");
00342     bufRef->release();
00343     return;
00344   }
00345 
00346   I2O_MESSAGE_FRAME           *stdMsg=(I2O_MESSAGE_FRAME*)bufRef->getDataLocation();
00347   I2O_BU_DISCARD_MESSAGE_FRAME*msg   =(I2O_BU_DISCARD_MESSAGE_FRAME*)stdMsg;
00348   unsigned int buResourceId=msg->buResourceId[0];
00349 
00350   lock();
00351   int result=sentIds_.erase(buResourceId);
00352   unlock();
00353   
00354   if (!result) {
00355     LOG4CPLUS_ERROR(log_,"can't discard unknown buResourceId '"<<buResourceId<<"'");
00356   }
00357   else {
00358     lock();
00359     freeIds_.push(buResourceId);
00360     nbEventsDiscarded_.value_++;
00361     unlock();
00362     postBuild();
00363   }
00364   
00365   bufRef->release();
00366 }
00367 
00368 
00369 //______________________________________________________________________________
00370 void BU::actionPerformed(xdata::Event& e)
00371 {
00372   gui_->monInfoSpace()->lock();
00373   if (e.type()=="urn:xdata-event:ItemGroupRetrieveEvent") {
00374     mode_=(0==PlaybackRawDataProvider::instance())?"RANDOM":"PLAYBACK";
00375     if (0!=i2oPool_) memUsedInMB_=i2oPool_->getMemoryUsage().getUsed()*9.53674e-07;
00376     else             memUsedInMB_=0.0;
00377   }
00378   else if (e.type()=="ItemChangedEvent") {
00379     string item=dynamic_cast<xdata::ItemChangedEvent&>(e).itemName();
00380     if (item=="crc") BUEvent::setComputeCrc(crc_.value_);
00381   }
00382   gui_->monInfoSpace()->unlock();
00383 }
00384 
00385 
00386 //______________________________________________________________________________
00387 void BU::webPageRequest(xgi::Input *in,xgi::Output *out)
00388   throw (xgi::exception::Exception)
00389 {
00390   string name=in->getenv("PATH_INFO");
00391   if (name.empty()) name="defaultWebPage";
00392   static_cast<xgi::MethodSignature*>(gui_->getMethod(name))->invoke(in,out);
00393 }
00394 
00395 
00396 //______________________________________________________________________________
00397 void BU::customWebPage(xgi::Input*in,xgi::Output*out)
00398   throw (xgi::exception::Exception)
00399 {
00400   *out<<"<html></html>"<<endl;
00401 }
00402 
00403 
00404 //______________________________________________________________________________
00405 void BU::startBuildingWorkLoop() throw (evf::Exception)
00406 {
00407   try {
00408     LOG4CPLUS_INFO(log_,"Start 'building' workloop");
00409     wlBuilding_=
00410       toolbox::task::getWorkLoopFactory()->getWorkLoop(sourceId_+
00411                                                        "Building",
00412                                                        "waiting");
00413     if (!wlBuilding_->isActive()) wlBuilding_->activate();
00414     asBuilding_=toolbox::task::bind(this,&BU::building,sourceId_+"Building");
00415     wlBuilding_->submit(asBuilding_);
00416     isBuilding_=true;
00417   }
00418   catch (xcept::Exception& e) {
00419     string msg = "Failed to start workloop 'building'.";
00420     XCEPT_RETHROW(evf::Exception,msg,e);
00421   }
00422 }
00423 
00424 
00425 //______________________________________________________________________________
00426 bool BU::building(toolbox::task::WorkLoop* wl)
00427 {
00428   waitBuild();
00429   lock();
00430   unsigned int buResourceId=freeIds_.front(); freeIds_.pop();
00431   unlock();
00432   
00433   if (buResourceId>=(uint32_t)events_.size()) {
00434     LOG4CPLUS_INFO(log_,"shutdown 'building' workloop.");
00435     isBuilding_=false;
00436     return false;
00437   }
00438   
00439   if (!isHalting_) {
00440     BUEvent* evt=events_[buResourceId];
00441     if(generateEvent(evt)) {
00442       lock();
00443       nbEventsBuilt_++;
00444       builtIds_.push(buResourceId);
00445       unlock();
00446       
00447       postSend();
00448     }
00449     else {
00450       LOG4CPLUS_INFO(log_,"building:received null post");
00451       lock();
00452       unsigned int saveBUResourceId = buResourceId;
00453       //buResourceId = freeIds_.front(); freeIds_.pop();
00454       freeIds_.push(saveBUResourceId);
00455       unlock();
00456       isBuilding_=false;
00457       return false;
00458     }
00459   }
00460   return true;
00461 }
00462 
00463 
00464 //______________________________________________________________________________
00465 void BU::startSendingWorkLoop() throw (evf::Exception)
00466 {
00467   try {
00468     LOG4CPLUS_INFO(log_,"Start 'sending' workloop");
00469     wlSending_=toolbox::task::getWorkLoopFactory()->getWorkLoop(sourceId_+
00470                                                                 "Sending",
00471                                                                 "waiting");
00472     if (!wlSending_->isActive()) wlSending_->activate();
00473     asSending_=toolbox::task::bind(this,&BU::sending,sourceId_+"Sending");
00474     wlSending_->submit(asSending_);
00475     isSending_=true;
00476   }
00477   catch (xcept::Exception& e) {
00478     string msg = "Failed to start workloop 'sending'.";
00479     XCEPT_RETHROW(evf::Exception,msg,e);
00480   }
00481 }
00482 
00483 
00484 //______________________________________________________________________________
00485 bool BU::sending(toolbox::task::WorkLoop* wl)
00486 {
00487   waitSend();
00488   lock();
00489   unsigned int buResourceId=builtIds_.front(); builtIds_.pop();
00490   unlock();
00491   
00492   if (buResourceId>=(uint32_t)events_.size()) {
00493     LOG4CPLUS_INFO(log_,"shutdown 'sending' workloop.");
00494     isSending_=false;
00495     return false;
00496   }
00497 
00498   if (!isHalting_) {
00499     waitRqst();
00500     lock();
00501     unsigned int fuResourceId=rqstIds_.front(); rqstIds_.pop();
00502     unlock();
00503     
00504     BUEvent* evt=events_[buResourceId];
00505     toolbox::mem::Reference* msg=createMsgChain(evt,fuResourceId);
00506     
00507     lock();
00508     sumOfSquares_+=(uint64_t)evt->evtSize()*(uint64_t)evt->evtSize();
00509     sumOfSizes_  +=evt->evtSize();
00510     nbEventsInBU_--;
00511     nbEventsSent_++;
00512     sentIds_.insert(buResourceId);
00513     unlock();
00514     
00515     buAppContext_->postFrame(msg,buAppDesc_,fuAppDesc_);  
00516   }
00517   
00518   return true;
00519 }
00520 
00521 
00522 //______________________________________________________________________________
00523 void BU::startMonitoringWorkLoop() throw (evf::Exception)
00524 {
00525   struct timezone timezone;
00526   gettimeofday(&monStartTime_,&timezone);
00527   
00528   try {
00529     LOG4CPLUS_INFO(log_,"Start 'monitoring' workloop");
00530     wlMonitoring_=
00531       toolbox::task::getWorkLoopFactory()->getWorkLoop(sourceId_+
00532                                                        "Monitoring",
00533                                                        "waiting");
00534     if (!wlMonitoring_->isActive()) wlMonitoring_->activate();
00535     asMonitoring_=toolbox::task::bind(this,&BU::monitoring,sourceId_+"Monitoring");
00536     wlMonitoring_->submit(asMonitoring_);
00537   }
00538   catch (xcept::Exception& e) {
00539     string msg = "Failed to start workloop 'monitoring'.";
00540     XCEPT_RETHROW(evf::Exception,msg,e);
00541   }
00542 }
00543 
00544 
00545 //______________________________________________________________________________
00546 bool BU::monitoring(toolbox::task::WorkLoop* wl)
00547 {
00548   struct timeval  monEndTime;
00549   struct timezone timezone;
00550   
00551   gettimeofday(&monEndTime,&timezone);
00552   
00553   lock();
00554   unsigned int monN           =nbEventsBuilt_.value_;
00555   uint64_t     monSumOfSquares=sumOfSquares_;
00556   unsigned int monSumOfSizes  =sumOfSizes_;
00557   uint64_t     deltaSumOfSquares;
00558   unlock();
00559   
00560   gui_->monInfoSpace()->lock();
00561   
00562   deltaT_.value_=deltaT(&monStartTime_,&monEndTime);
00563   monStartTime_=monEndTime;
00564   
00565   deltaN_.value_=monN-monLastN_;
00566   monLastN_=monN;
00567 
00568   deltaSumOfSquares=monSumOfSquares-monLastSumOfSquares_;
00569   deltaSumOfSquares_.value_=(double)deltaSumOfSquares;
00570   monLastSumOfSquares_=monSumOfSquares;
00571   
00572   deltaSumOfSizes_.value_=monSumOfSizes-monLastSumOfSizes_;
00573   monLastSumOfSizes_=monSumOfSizes;
00574   
00575   if (deltaT_.value_!=0) {
00576     throughput_=deltaSumOfSizes_.value_/deltaT_.value_;
00577     rate_      =deltaN_.value_/deltaT_.value_;
00578   }
00579   else {
00580     throughput_=0.0;
00581     rate_      =0.0;
00582   }
00583   
00584   double meanOfSquares,mean,squareOfMean,variance;
00585   
00586   if(deltaN_.value_!=0) {
00587     meanOfSquares=deltaSumOfSquares_.value_/((double)(deltaN_.value_));
00588     mean=((double)(deltaSumOfSizes_.value_))/((double)(deltaN_.value_));
00589     squareOfMean=mean*mean;
00590     variance=meanOfSquares-squareOfMean;
00591     average_=deltaSumOfSizes_.value_/deltaN_.value_;
00592     rms_    =std::sqrt(variance);
00593   }
00594   else {
00595     average_=0.0;
00596     rms_    =0.0;
00597   }
00598 
00599   gui_->monInfoSpace()->unlock();
00600   
00601   ::sleep(monSleepSec_.value_);
00602 
00603   return true;
00604 }
00605     
00606 
00607 
00609 // implementation of private member functions
00611 
00612 //______________________________________________________________________________
00613 void BU::exportParameters()
00614 {
00615   if (0==gui_) {
00616     LOG4CPLUS_ERROR(log_,"No GUI, can't export parameters");
00617     return;
00618   }
00619   
00620   gui_->addMonitorParam("url",                &url_);
00621   gui_->addMonitorParam("class",              &class_);
00622   gui_->addMonitorParam("instance",           &instance_);
00623   gui_->addMonitorParam("hostname",           &hostname_);
00624   gui_->addMonitorParam("runNumber",          &runNumber_);
00625   gui_->addMonitorParam("stateName",          fsm_.stateName());
00626   gui_->addMonitorParam("memUsedInMB",        &memUsedInMB_);
00627   gui_->addMonitorParam("deltaT",             &deltaT_);
00628   gui_->addMonitorParam("deltaN",             &deltaN_);
00629   gui_->addMonitorParam("deltaSumOfSquares",  &deltaSumOfSquares_);
00630   gui_->addMonitorParam("deltaSumOfSizes",    &deltaSumOfSizes_);
00631   gui_->addMonitorParam("throughput",         &throughput_);
00632   gui_->addMonitorParam("average",            &average_);
00633   gui_->addMonitorParam("rate",               &rate_);
00634   gui_->addMonitorParam("rms",                &rms_);
00635 
00636   gui_->addMonitorCounter("nbEvtsInBU",       &nbEventsInBU_);
00637   gui_->addMonitorCounter("nbEvtsRequested",  &nbEventsRequested_);
00638   gui_->addMonitorCounter("nbEvtsBuilt",      &nbEventsBuilt_);
00639   gui_->addMonitorCounter("nbEvtsSent",       &nbEventsSent_);
00640   gui_->addMonitorCounter("nbEvtsDiscarded",  &nbEventsDiscarded_);
00641 
00642   gui_->addStandardParam("mode",              &mode_);
00643   gui_->addStandardParam("replay",            &replay_);
00644   gui_->addStandardParam("overwriteEvtId",    &overwriteEvtId_);
00645   gui_->addStandardParam("crc",               &crc_);
00646   gui_->addStandardParam("firstEvent",        &firstEvent_);
00647   gui_->addStandardParam("queueSize",         &queueSize_);
00648   gui_->addStandardParam("eventBufferSize",   &eventBufferSize_);
00649   gui_->addStandardParam("msgBufferSize",     &msgBufferSize_);
00650   gui_->addStandardParam("fedSizeMax",        &fedSizeMax_);
00651   gui_->addStandardParam("fedSizeMean",       &fedSizeMean_);
00652   gui_->addStandardParam("fedSizeWidth",      &fedSizeWidth_);
00653   gui_->addStandardParam("useFixedFedSize",   &useFixedFedSize_);
00654   gui_->addStandardParam("monSleepSec",       &monSleepSec_);
00655   gui_->addStandardParam("rcmsStateListener",     fsm_.rcmsStateListener());
00656   gui_->addStandardParam("foundRcmsStateListener",fsm_.foundRcmsStateListener());
00657 
00658   
00659   gui_->exportParameters();
00660 
00661   gui_->addItemChangedListener("crc",this);
00662   
00663 }
00664 
00665 
00666 //______________________________________________________________________________
00667 void BU::reset()
00668 {
00669   gui_->resetCounters();
00670   
00671   deltaT_             =0.0;
00672   deltaN_             =  0;
00673   deltaSumOfSquares_  =  0;
00674   deltaSumOfSizes_    =  0;
00675   
00676   throughput_         =0.0;
00677   average_            =  0;
00678   rate_               =  0;
00679   rms_                =  0;
00680 
00681   monLastN_           =  0;
00682   monLastSumOfSquares_=  0;
00683   monLastSumOfSizes_  =  0;
00684   
00685   while (events_.size()) {
00686     delete events_.back();
00687     events_.pop_back();
00688   }
00689   
00690   while (!rqstIds_.empty())  rqstIds_.pop();
00691   while (!freeIds_.empty())  freeIds_.pop();
00692   while (!builtIds_.empty()) builtIds_.pop();
00693   sentIds_.clear();
00694   
00695   sem_init(&lock_,0,1);
00696   sem_init(&buildSem_,0,queueSize_);
00697   sem_init(&sendSem_,0,0);
00698   sem_init(&rqstSem_,0,0);
00699   
00700   for (unsigned int i=0;i<queueSize_;i++) {
00701     events_.push_back(new BUEvent(i,eventBufferSize_));
00702     freeIds_.push(i);
00703   }
00704 }
00705 
00706 
00707 //______________________________________________________________________________
00708 double BU::deltaT(const struct timeval *start,const struct timeval *end)
00709 {
00710   unsigned int  sec;
00711   unsigned int  usec;
00712   
00713   sec = end->tv_sec - start->tv_sec;
00714   
00715   if(end->tv_usec > start->tv_usec) {
00716     usec = end->tv_usec - start->tv_usec;
00717   }
00718   else {
00719     sec--;
00720     usec = 1000000 - ((unsigned int )(start->tv_usec - end->tv_usec));
00721   }
00722   
00723   return ((double)sec) + ((double)usec) / 1000000.0;
00724 }
00725 
00726 
00727 //______________________________________________________________________________
00728 bool BU::generateEvent(BUEvent* evt)
00729 {
00730   // replay?
00731   if (replay_.value_&&nbEventsBuilt_>=(uint32_t)events_.size()) 
00732     {
00733       PlaybackRawDataProvider::instance()->setFreeToEof();
00734       return true;
00735     }  
00736   // PLAYBACK mode
00737   if (0!=PlaybackRawDataProvider::instance()) {
00738     
00739     unsigned int runNumber,evtNumber;
00740 
00741     FEDRawDataCollection* event=
00742       PlaybackRawDataProvider::instance()->getFEDRawData(runNumber,evtNumber);
00743     if(event == 0) return false;
00744     evt->initialize(evtNumber);
00745     
00746     for (unsigned int i=0;i<validFedIds_.size();i++) {
00747       unsigned int   fedId  =validFedIds_[i];
00748       unsigned int   fedSize=event->FEDData(fedId).size();
00749       unsigned char* fedAddr=event->FEDData(fedId).data();
00750       if (overwriteEvtId_.value_ && fedAddr != 0) {
00751         fedh_t *fedHeader=(fedh_t*)fedAddr;
00752         fedHeader->eventid=(fedHeader->eventid&0xFF000000)+(evtNumber&0x00FFFFFF);
00753       }
00754       if (fedSize>0) evt->writeFed(fedId,fedAddr,fedSize);
00755     }
00756     delete event;
00757   }
00758   // RANDOM mode
00759   else {
00760     unsigned int evtNumber=(firstEvent_+evtNumber_++)%0x1000000;
00761     evt->initialize(evtNumber);
00762     unsigned int fedSizeMin=fedHeaderSize_+fedTrailerSize_;
00763     for (unsigned int i=0;i<validFedIds_.size();i++) {
00764       unsigned int fedId(validFedIds_[i]);
00765       unsigned int fedSize(fedSizeMean_);
00766       if (!useFixedFedSize_) {
00767         double logFedSize=CLHEP::RandGauss::shoot(gaussianMean_,gaussianWidth_);
00768         fedSize=(unsigned int)(std::exp(logFedSize));
00769         if (fedSize<fedSizeMin)  fedSize=fedSizeMin;
00770         if (fedSize>fedSizeMax_) fedSize=fedSizeMax_;
00771         fedSize-=fedSize%8;
00772       }
00773       
00774       evt->writeFed(fedId,0,fedSize);
00775       evt->writeFedHeader(i);
00776       evt->writeFedTrailer(i);
00777     }
00778     
00779   }
00780   return true;
00781 }
00782 
00783 
00784 //______________________________________________________________________________
00785 toolbox::mem::Reference *BU::createMsgChain(BUEvent* evt,
00786                                             unsigned int fuResourceId)
00787 {
00788   unsigned int msgHeaderSize =sizeof(I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME);
00789   unsigned int msgPayloadSize=msgBufferSize_-msgHeaderSize;
00790 
00791   if((msgPayloadSize%4)!=0) LOG4CPLUS_ERROR(log_,"Invalid Payload Size.");
00792   
00793   toolbox::mem::Reference *head  =0;
00794   toolbox::mem::Reference *tail  =0;
00795   toolbox::mem::Reference *bufRef=0;
00796   
00797   I2O_MESSAGE_FRAME                  *stdMsg=0;
00798   I2O_PRIVATE_MESSAGE_FRAME          *pvtMsg=0;
00799   I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME *block =0;
00800   
00801   unsigned int iFed            =0;
00802   unsigned int nSuperFrag      =64;
00803   unsigned int nFedPerSuperFrag=validFedIds_.size()/nSuperFrag;
00804   unsigned int nBigSuperFrags  =validFedIds_.size()%nSuperFrag;
00805   
00806   if (evt->nFed()<nSuperFrag) {
00807     nSuperFrag=evt->nFed();
00808     nFedPerSuperFrag=1;
00809     nBigSuperFrags=0;
00810   }
00811   else
00812     {
00813       nFedPerSuperFrag=evt->nFed()/nSuperFrag;
00814       nBigSuperFrags  =evt->nFed()%nSuperFrag;
00815     }
00816   // loop over all super fragments
00817   for (unsigned int iSuperFrag=0;iSuperFrag<nSuperFrag;iSuperFrag++) {
00818     
00819     // compute index of last fed in this super fragment
00820     unsigned int nFed=iFed+nFedPerSuperFrag;
00821     if (iSuperFrag<nBigSuperFrags) ++nFed;
00822     
00823     // compute number of blocks in this super fragment
00824     unsigned int nBlock  =0;
00825     unsigned int curbSize=frlHeaderSize_;
00826     unsigned int totSize =curbSize;
00827     for (unsigned int i=iFed;i<nFed;i++) {
00828       curbSize+=evt->fedSize(i);
00829       totSize+=evt->fedSize(i);
00830       if (curbSize>msgPayloadSize) {
00831         curbSize+=frlHeaderSize_*(curbSize/msgPayloadSize);
00832         if(curbSize%msgPayloadSize)totSize+=frlHeaderSize_*(curbSize/msgPayloadSize);
00833         else totSize+=frlHeaderSize_*((curbSize/msgPayloadSize)-1);
00834         curbSize=curbSize%msgPayloadSize;
00835       }
00836     }
00837     nBlock=totSize/msgPayloadSize+(totSize%msgPayloadSize>0 ? 1 : 0);
00838     
00839     
00840     // loop over all blocks (msgs) in the current super fragment
00841     unsigned int   remainder     =0;
00842     bool           fedTrailerLeft=false;
00843     bool           last          =false;
00844     bool           warning       =false;
00845     unsigned char *startOfPayload=0;
00846     U32            payload(0);
00847     
00848     for(unsigned int iBlock=0;iBlock<nBlock;iBlock++) {
00849       
00850       // If last block and its partial (there can be only 0 or 1 partial)
00851       payload=msgPayloadSize;
00852       
00853       // Allocate memory for a fragment block / message
00854       try {
00855         bufRef=toolbox::mem::getMemoryPoolFactory()->getFrame(i2oPool_,
00856                                                               msgBufferSize_);
00857       }
00858       catch(xcept::Exception &e) {
00859         LOG4CPLUS_FATAL(log_,"xdaq::frameAlloc failed");
00860       }
00861       
00862       // Fill in the fields of the fragment block / message
00863       stdMsg=(I2O_MESSAGE_FRAME*)bufRef->getDataLocation();
00864       pvtMsg=(I2O_PRIVATE_MESSAGE_FRAME*)stdMsg;
00865       block =(I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME*)stdMsg;
00866       
00867       pvtMsg->XFunctionCode   =I2O_FU_TAKE;
00868       pvtMsg->OrganizationID  =XDAQ_ORGANIZATION_ID;
00869       
00870       stdMsg->MessageSize     =(msgHeaderSize + payload) >> 2;
00871       stdMsg->Function        =I2O_PRIVATE_MESSAGE;
00872       stdMsg->VersionOffset   =0;
00873       stdMsg->MsgFlags        =0;
00874       stdMsg->InitiatorAddress=i2o::utils::getAddressMap()->getTid(buAppDesc_);
00875       stdMsg->TargetAddress   =i2o::utils::getAddressMap()->getTid(fuAppDesc_);
00876       
00877       block->buResourceId           =evt->buResourceId();
00878       block->fuTransactionId        =fuResourceId;
00879       block->blockNb                =iBlock;
00880       block->nbBlocksInSuperFragment=nBlock;
00881       block->superFragmentNb        =iSuperFrag;
00882       block->nbSuperFragmentsInEvent=nSuperFrag;
00883       block->eventNumber            =evt->evtNumber();
00884       
00885       // Fill in payload 
00886       startOfPayload   =(unsigned char*)block+msgHeaderSize;
00887       frlh_t* frlHeader=(frlh_t*)startOfPayload;
00888       frlHeader->trigno=evt->evtNumber();
00889       frlHeader->segno =iBlock;
00890       
00891       unsigned char *startOfFedBlocks=startOfPayload+frlHeaderSize_;
00892       payload              -=frlHeaderSize_;
00893       frlHeader->segsize    =payload;
00894       unsigned int leftspace=payload;
00895       
00896       // a fed trailer was left over from the previous block
00897       if(fedTrailerLeft) {
00898         memcpy(startOfFedBlocks,
00899                evt->fedAddr(iFed)+evt->fedSize(iFed)-fedTrailerSize_,
00900                fedTrailerSize_);
00901         
00902         startOfFedBlocks+=fedTrailerSize_;
00903         leftspace       -=fedTrailerSize_;
00904         remainder        =0;
00905         fedTrailerLeft   =false;
00906         
00907         // if this is the last fed, adjust block (msg) size and set last=true
00908         if((iFed==nFed-1) && !last) {
00909           frlHeader->segsize-=leftspace;
00910           int msgSize=stdMsg->MessageSize << 2;
00911           msgSize   -=leftspace;
00912           bufRef->setDataSize(msgSize);
00913           stdMsg->MessageSize = msgSize >> 2;
00914           frlHeader->segsize=frlHeader->segsize | FRL_LAST_SEGM;
00915           last=true;
00916         }
00917         
00918         // !! increment iFed !!
00919         iFed++;
00920       }
00921       
00925       if (remainder>0) {
00926         
00927         // the remaining fed fits entirely into the new block
00928         if(payload>=remainder) {
00929           memcpy(startOfFedBlocks,
00930                  evt->fedAddr(iFed)+evt->fedSize(iFed)-remainder,
00931                  remainder);
00932           
00933           startOfFedBlocks+=remainder;
00934           leftspace       -=remainder;
00935           
00936           // if this is the last fed in the superfragment, earmark it
00937           if(iFed==nFed-1) {
00938             frlHeader->segsize-=leftspace;
00939             int msgSize=stdMsg->MessageSize << 2;
00940             msgSize   -=leftspace;
00941             bufRef->setDataSize(msgSize);
00942             stdMsg->MessageSize = msgSize >> 2;
00943             frlHeader->segsize=frlHeader->segsize | FRL_LAST_SEGM;
00944             last=true;
00945           }
00946           
00947           // !! increment iFed !!
00948           iFed++;
00949           
00950           // start new fed -> set remainder to 0!
00951           remainder=0;
00952         }
00953         // the remaining payload fits, but not the fed trailer
00954         else if (payload>=(remainder-fedTrailerSize_)) {
00955           memcpy(startOfFedBlocks,
00956                  evt->fedAddr(iFed)+evt->fedSize(iFed)-remainder,
00957                  remainder-fedTrailerSize_);
00958           
00959           frlHeader->segsize=remainder-fedTrailerSize_;
00960           fedTrailerLeft    =true;
00961           leftspace        -=(remainder-fedTrailerSize_);
00962           remainder         =fedTrailerSize_;
00963         }
00964         // the remaining payload fits only partially, fill whole block
00965         else {
00966           memcpy(startOfFedBlocks,
00967                  evt->fedAddr(iFed)+evt->fedSize(iFed)-remainder,payload);
00968           remainder-=payload;
00969           leftspace =0;
00970         }
00971       }
00972       
00976       if(remainder==0) {
00977         
00978         // loop on feds
00979         while(iFed<nFed) {
00980           
00981           // if the next header does not fit, jump to following block
00982           if((int)leftspace<fedHeaderSize_) {
00983             frlHeader->segsize-=leftspace;
00984             break;
00985           }
00986           
00987           memcpy(startOfFedBlocks,evt->fedAddr(iFed),fedHeaderSize_);
00988           
00989           leftspace       -=fedHeaderSize_;
00990           startOfFedBlocks+=fedHeaderSize_;
00991           
00992           // fed fits with its trailer
00993           if(evt->fedSize(iFed)-fedHeaderSize_<=leftspace) {
00994             memcpy(startOfFedBlocks,
00995                    evt->fedAddr(iFed)+fedHeaderSize_,
00996                    evt->fedSize(iFed)-fedHeaderSize_);
00997             
00998             leftspace       -=(evt->fedSize(iFed)-fedHeaderSize_);
00999             startOfFedBlocks+=(evt->fedSize(iFed)-fedHeaderSize_);
01000           }
01001           // fed payload fits only without fed trailer
01002           else if(evt->fedSize(iFed)-fedHeaderSize_-fedTrailerSize_<=leftspace) {
01003             memcpy(startOfFedBlocks,
01004                    evt->fedAddr(iFed)+fedHeaderSize_,
01005                    evt->fedSize(iFed)-fedHeaderSize_-fedTrailerSize_);
01006             
01007             leftspace         -=(evt->fedSize(iFed)-fedHeaderSize_-fedTrailerSize_);
01008             frlHeader->segsize-=leftspace;
01009             fedTrailerLeft     =true;
01010             remainder          =fedTrailerSize_;
01011             
01012             break;
01013           }
01014           // fed payload fits only partially
01015           else {
01016             memcpy(startOfFedBlocks,evt->fedAddr(iFed)+fedHeaderSize_,leftspace);
01017             remainder=evt->fedSize(iFed)-fedHeaderSize_-leftspace;
01018             leftspace=0;
01019             
01020             break;
01021           }
01022           
01023           // !! increase iFed !!
01024           iFed++;
01025           
01026         } // while (iFed<fedN_)
01027         
01028         // earmark the last block
01029         if (iFed==nFed && remainder==0 && !last) {
01030           frlHeader->segsize-=leftspace;
01031           int msgSize=stdMsg->MessageSize << 2;
01032           msgSize   -=leftspace;
01033           bufRef->setDataSize(msgSize);
01034           stdMsg->MessageSize=msgSize >> 2;
01035           frlHeader->segsize =frlHeader->segsize | FRL_LAST_SEGM;
01036           last=true;
01037         }
01038         
01039       } // if (remainder==0)
01040       
01041       if(iSuperFrag==0&&iBlock==0) { // This is the first fragment block / message
01042         head=bufRef;
01043         tail=bufRef;
01044       }
01045       else {
01046         tail->setNextReference(bufRef);
01047         tail=bufRef;
01048       }
01049       
01050       if((iBlock==nBlock-1) && remainder!=0) {
01051         nBlock++;
01052         warning=true;
01053       }
01054       
01055     } // for (iBlock)
01056     
01057     // fix case where block estimate was wrong
01058     if(warning) {
01059       toolbox::mem::Reference* next=head;
01060       do {
01061         block =(I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME*)next->getDataLocation();
01062         if (block->superFragmentNb==iSuperFrag)
01063           block->nbBlocksInSuperFragment=nBlock;                
01064       } while((next=next->getNextReference()));
01065     }
01066   
01067   } // iSuperFrag < nSuperFrag
01068   
01069   return head; // return the top of the chain
01070 }
01071 
01072 //______________________________________________________________________________
01073 void BU::dumpFrame(unsigned char* data,unsigned int len)
01074 {
01075   char left1[20];
01076   char left2[20];
01077   char right1[20];
01078   char right2[20];
01079 
01080   printf("Byte  0  1  2  3  4  5  6  7\n");
01081   
01082   int c(0);
01083   int pos(0);
01084   
01085   for (unsigned int i=0;i<(len/8);i++) {
01086     int rpos(0);
01087     int off(3);
01088     for (pos=0;pos<12;pos+=3) {
01089       sprintf(&left1[pos],"%2.2x ",
01090               ((unsigned char*)data)[c+off]);
01091       sprintf(&right1[rpos],"%1c",
01092               ((data[c+off] > 32)&&(data[c+off] < 127)) ? data[c+off] : '.');
01093       sprintf (&left2[pos],"%2.2x ",
01094                ((unsigned char*)data)[c+off+4]);
01095       sprintf (&right2[rpos],"%1c",
01096                ((data[c+off+4] > 32)&&(data[c+off+4]<127)) ? data[c+off+4] : '.');
01097       rpos++;
01098       off--;
01099     }
01100     c+=8;
01101     
01102     printf ("%4d: %s%s ||  %s%s  %p\n",
01103             c-8, left1, left2, right1, right2, &data[c-8]);
01104   }
01105   
01106   fflush(stdout);       
01107 }
01108 
01109 
01111 // xdaq instantiator implementation macro
01113 
01114 XDAQ_INSTANTIATOR_IMPL(BU)