CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_5_3_9_patch3/src/EventFilter/AutoBU/src/BU.cc

Go to the documentation of this file.
00001 
00002 //
00003 // BU
00004 // --
00005 //
00006 //                                         Emilio Meschi <emilio.meschi@cern.ch>
00007 //                       Philipp Schieferdecker <philipp.schieferdecker@cern.ch>
00009 
00010 
00011 #include "EventFilter/AutoBU/interface/BU.h"
00012 
00013 #include "FWCore/Utilities/interface/CRC16.h"
00014 
00015 #include "DataFormats/FEDRawData/interface/FEDNumbering.h"
00016 #include "DataFormats/FEDRawData/interface/FEDRawDataCollection.h"
00017 #include "EventFilter/FEDInterface/interface/GlobalEventNumber.icc"
00018 
00019 #include "xoap/SOAPEnvelope.h"
00020 #include "xoap/SOAPBody.h"
00021 #include "xoap/domutils.h"
00022 
00023 #include <netinet/in.h>
00024 #include <sstream>
00025 
00026 
00027 using namespace std;
00028 using namespace evf;
00029 
00030 
00032 // construction/destruction
00034 
00035 //______________________________________________________________________________
00036 BU::BU(xdaq::ApplicationStub *s) 
00037   : xdaq::Application(s)
00038   , log_(getApplicationLogger())
00039   , buAppDesc_(getApplicationDescriptor())
00040   , fuAppDesc_(0)
00041   , buAppContext_(getApplicationContext())
00042   , fsm_(this)
00043   , gui_(0)
00044   , evtNumber_(0)
00045   , isBuilding_(false)
00046   , isSending_(false)
00047   , isHalting_(false)
00048   , wlBuilding_(0)
00049   , asBuilding_(0)
00050   , wlSending_(0)
00051   , asSending_(0)
00052   , wlMonitoring_(0)
00053   , asMonitoring_(0)
00054   , instance_(0)
00055   , runNumber_(0)
00056   , memUsedInMB_(0.0)
00057   , deltaT_(0.0)
00058   , deltaN_(0)
00059   , deltaSumOfSquares_(0)
00060   , deltaSumOfSizes_(0)
00061   , throughput_(0.0)
00062   , average_(0.0)
00063   , rate_(0.0)
00064   , rms_(0.0)
00065   , nbEventsInBU_(0)
00066   , nbEventsRequested_(0)
00067   , nbEventsBuilt_(0)
00068   , nbEventsSent_(0)
00069   , nbEventsDiscarded_(0)
00070   , mode_("RANDOM")
00071   , replay_(false)
00072   , crc_(true)
00073   , overwriteEvtId_(true)
00074   , overwriteLsId_(false)
00075   , fakeLsUpdateSecs_(23)
00076   , firstEvent_(1)
00077   , queueSize_(32)
00078   , eventBufferSize_(0x400000)
00079   , msgBufferSize_(32768)
00080   , fedSizeMax_(65536)
00081   , fedSizeMean_(1024)
00082   , fedSizeWidth_(1024)
00083   , useFixedFedSize_(false)
00084   , monSleepSec_(1)
00085   , fakeLs_(0)
00086   , gaussianMean_(0.0)
00087   , gaussianWidth_(1.0)
00088   , monLastN_(0)
00089   , monLastSumOfSquares_(0)
00090   , monLastSumOfSizes_(0)
00091   , sumOfSquares_(0)
00092   , sumOfSizes_(0)
00093   , i2oPool_(0)
00094 {
00095   // initialize state machine
00096   fsm_.initialize<evf::BU>(this);
00097   
00098   // initialize application info
00099   url_     =
00100     getApplicationDescriptor()->getContextDescriptor()->getURL()+"/"+
00101     getApplicationDescriptor()->getURN();
00102   class_   =getApplicationDescriptor()->getClassName();
00103   instance_=getApplicationDescriptor()->getInstance();
00104   hostname_=getApplicationDescriptor()->getContextDescriptor()->getURL();
00105   sourceId_=class_.toString()+instance_.toString();
00106   
00107   // i2o callbacks
00108   i2o::bind(this,&BU::I2O_BU_ALLOCATE_Callback,I2O_BU_ALLOCATE,XDAQ_ORGANIZATION_ID);
00109   i2o::bind(this,&BU::I2O_BU_DISCARD_Callback, I2O_BU_DISCARD, XDAQ_ORGANIZATION_ID);
00110   
00111   // allocate i2o memery pool
00112   string i2oPoolName=sourceId_+"_i2oPool";
00113   try {
00114     toolbox::mem::HeapAllocator *allocator=new toolbox::mem::HeapAllocator();
00115     toolbox::net::URN urn("toolbox-mem-pool",i2oPoolName);
00116     toolbox::mem::MemoryPoolFactory* poolFactory=
00117       toolbox::mem::getMemoryPoolFactory();
00118     i2oPool_=poolFactory->createPool(urn,allocator);
00119   }
00120   catch (toolbox::mem::exception::Exception& e) {
00121     string s="Failed to create pool: "+i2oPoolName;
00122     LOG4CPLUS_FATAL(log_,s);
00123     XCEPT_RETHROW(xcept::Exception,s,e);
00124   }
00125   
00126   // web interface
00127   xgi::bind(this,&evf::BU::webPageRequest,"Default");
00128   gui_=new WebGUI(this,&fsm_);
00129   gui_->setSmallAppIcon("/rubuilder/bu/images/bu32x32.gif");
00130   gui_->setLargeAppIcon("/rubuilder/bu/images/bu64x64.gif");
00131   
00132   vector<toolbox::lang::Method*> methods=gui_->getMethods();
00133   vector<toolbox::lang::Method*>::iterator it;
00134   for (it=methods.begin();it!=methods.end();++it) {
00135     if ((*it)->type()=="cgi") {
00136       string name=static_cast<xgi::MethodSignature*>(*it)->name();
00137       xgi::bind(this,&evf::BU::webPageRequest,name);
00138     }
00139   }
00140   xgi::bind(this,&evf::BU::customWebPage,"customWebPage");
00141   
00142   
00143   // export parameters to info space(s)
00144   exportParameters();
00145 
00146   // findRcmsStateListener
00147   fsm_.findRcmsStateListener();
00148   
00149   // compute parameters for fed size generation (a la Emilio)
00150   gaussianMean_ =std::log((double)fedSizeMean_);
00151   gaussianWidth_=std::sqrt(std::log
00152                            (0.5*
00153                             (1+std::sqrt
00154                              (1.0+4.0*
00155                               fedSizeWidth_.value_*fedSizeWidth_.value_/
00156                               fedSizeMean_.value_/fedSizeMean_.value_))));
00157 
00158   // start monitoring thread, once and for all
00159   startMonitoringWorkLoop();
00160   
00161   // propagate crc flag to BUEvent
00162   BUEvent::setComputeCrc(crc_.value_);
00163 }
00164 
00165 
00166 //______________________________________________________________________________
00167 BU::~BU()
00168 {
00169   while (!events_.empty()) { delete events_.back(); events_.pop_back(); }
00170 }
00171 
00172 
00174 // implementation of member functions
00176 
00177 //______________________________________________________________________________
00178 bool BU::configuring(toolbox::task::WorkLoop* wl)
00179 {
00180   isHalting_=false;
00181   try {
00182     LOG4CPLUS_INFO(log_,"Start configuring ...");
00183     reset();
00184     LOG4CPLUS_INFO(log_,"Finished configuring!");
00185     fsm_.fireEvent("ConfigureDone",this);
00186   }
00187   catch (xcept::Exception &e) {
00188     string msg = "configuring FAILED: " + (string)e.what();
00189     fsm_.fireFailed(msg,this);
00190   }
00191 
00192   return false;
00193 }
00194 
00195 
00196 //______________________________________________________________________________
00197 bool BU::enabling(toolbox::task::WorkLoop* wl)
00198 {
00199   isHalting_=false;
00200   try {
00201     LOG4CPLUS_INFO(log_,"Start enabling ...");
00202     // determine valid fed ids (assumes Playback EP is already configured hence PBRDP::instance 
00203     // not null in case we are playing back)
00204     if (0!=PlaybackRawDataProvider::instance()) {
00205       for (unsigned int i=0;i<(unsigned int)FEDNumbering::MAXFEDID+1;i++)
00206         if (FEDNumbering::inRange(i)) validFedIds_.push_back(i);
00207     }
00208     else{
00209       for (unsigned int i=0;i<(unsigned int)FEDNumbering::MAXFEDID+1;i++)
00210         if (FEDNumbering::inRangeNoGT(i)) validFedIds_.push_back(i);
00211     }
00212     if (!isBuilding_) startBuildingWorkLoop();
00213     if (!isSending_)  startSendingWorkLoop();
00214     LOG4CPLUS_INFO(log_,"Finished enabling!");
00215     fsm_.fireEvent("EnableDone",this);
00216   }
00217   catch (xcept::Exception &e) {
00218     string msg = "enabling FAILED: " + (string)e.what();
00219     fsm_.fireFailed(msg,this);
00220   }
00221   
00222   return false;
00223 }
00224 
00225 
00226 //______________________________________________________________________________
00227 bool BU::stopping(toolbox::task::WorkLoop* wl)
00228 {
00229   try {
00230     LOG4CPLUS_INFO(log_,"Start stopping :) ...");
00231 
00232     if (0!=PlaybackRawDataProvider::instance()) { /*&&
00233         (!replay_.value_||nbEventsBuilt_<(uint32_t)events_.size())) { */
00234       lock();
00235       freeIds_.push(events_.size()); 
00236       unlock();
00237       postBuild();
00238       while (!builtIds_.empty()) {
00239         LOG4CPLUS_INFO(log_,"wait to flush ... #builtIds="<<builtIds_.size());
00240 	::sleep(1);
00241       }
00242       // let the playback go to the last event and exit
00243       PlaybackRawDataProvider::instance()->setFreeToEof(); 
00244       while (!PlaybackRawDataProvider::instance()->areFilesClosed()) usleep(1000000);
00245       usleep(100000);
00246     }
00247     
00248     lock();
00249     builtIds_.push(events_.size());
00250     unlock();
00251 
00252     postSend();
00253     while (!sentIds_.empty()) {
00254       LOG4CPLUS_INFO(log_,"wait to flush ...");
00255       ::sleep(1);
00256     }
00257     reset();
00258     //postBuild();
00259     /* this is not needed and should not run if reset is called
00260     if (0!=PlaybackRawDataProvider::instance()&&
00261         (replay_.value_&&nbEventsBuilt_>=(uint32_t)events_.size())) {
00262       lock();
00263       freeIds_.push(events_.size());
00264       unlock();
00265       postBuild();
00266     }
00267     */
00268     LOG4CPLUS_INFO(log_,"Finished stopping!");
00269     fsm_.fireEvent("StopDone",this);
00270   }
00271   catch (xcept::Exception &e) {
00272     string msg = "stopping FAILED: " + (string)e.what();
00273     fsm_.fireFailed(msg,this);
00274   }
00275   return false;
00276 }
00277 
00278 
00279 //______________________________________________________________________________
00280 bool BU::halting(toolbox::task::WorkLoop* wl)
00281 {
00282   try {
00283     LOG4CPLUS_INFO(log_,"Start halting ...");
00284     isHalting_=true;
00285     if (isBuilding_&&isSending_) {
00286       lock();
00287       freeIds_.push(events_.size());
00288       builtIds_.push(events_.size());
00289       unlock();
00290       postBuild();
00291       postSend();
00292     }
00293     if (0!=PlaybackRawDataProvider::instance()&&
00294         (!replay_.value_||nbEventsBuilt_<(uint32_t)events_.size())) { 
00295       PlaybackRawDataProvider::instance()->setFreeToEof();
00296       while (!PlaybackRawDataProvider::instance()->areFilesClosed()) usleep(1000000);
00297       usleep(100000);
00298     }
00299     LOG4CPLUS_INFO(log_,"Finished halting!");
00300     fsm_.fireEvent("HaltDone",this);
00301   }
00302   catch (xcept::Exception &e) {
00303     string msg = "halting FAILED: " + (string)e.what();
00304     fsm_.fireFailed(msg,this);
00305   }
00306   return false;
00307 }
00308 
00309 
00310 //______________________________________________________________________________
00311 xoap::MessageReference BU::fsmCallback(xoap::MessageReference msg)
00312   throw (xoap::exception::Exception)
00313 {
00314   return fsm_.commandCallback(msg);
00315 }
00316 
00317 
00318 //______________________________________________________________________________
00319 void BU::I2O_BU_ALLOCATE_Callback(toolbox::mem::Reference *bufRef)
00320 {
00321   if (isHalting_) {
00322     LOG4CPLUS_WARN(log_,"Ignore BU_ALLOCATE message while halting.");
00323     bufRef->release();
00324     return;
00325   }
00326   
00327   I2O_MESSAGE_FRAME             *stdMsg;
00328   I2O_BU_ALLOCATE_MESSAGE_FRAME *msg;
00329   
00330   stdMsg=(I2O_MESSAGE_FRAME*)bufRef->getDataLocation();
00331   msg   =(I2O_BU_ALLOCATE_MESSAGE_FRAME*)stdMsg;
00332   
00333   if (0==fuAppDesc_) {
00334     I2O_TID fuTid=stdMsg->InitiatorAddress;
00335     fuAppDesc_=i2o::utils::getAddressMap()->getApplicationDescriptor(fuTid);
00336   }
00337   
00338   for (unsigned int i=0;i<msg->n;i++) {
00339     unsigned int fuResourceId=msg->allocate[i].fuTransactionId;
00340     lock();
00341     rqstIds_.push(fuResourceId);
00342     postRqst();
00343     nbEventsRequested_++;
00344     nbEventsInBU_++;
00345     unlock();
00346   }
00347 
00348   bufRef->release();
00349 }
00350 
00351 
00352 //______________________________________________________________________________
00353 void BU::I2O_BU_DISCARD_Callback(toolbox::mem::Reference *bufRef)
00354 {
00355   if (isHalting_) {
00356     LOG4CPLUS_WARN(log_,"Ignore BU_DISCARD message while halting.");
00357     bufRef->release();
00358     return;
00359   }
00360 
00361   I2O_MESSAGE_FRAME           *stdMsg=(I2O_MESSAGE_FRAME*)bufRef->getDataLocation();
00362   I2O_BU_DISCARD_MESSAGE_FRAME*msg   =(I2O_BU_DISCARD_MESSAGE_FRAME*)stdMsg;
00363   unsigned int buResourceId=msg->buResourceId[0];
00364 
00365   lock();
00366   int result=sentIds_.erase(buResourceId);
00367   unlock();
00368   
00369   if (!result) {
00370     LOG4CPLUS_ERROR(log_,"can't discard unknown buResourceId '"<<buResourceId<<"'");
00371   }
00372   else {
00373     lock();
00374     freeIds_.push(buResourceId);
00375     nbEventsDiscarded_.value_++;
00376     unlock();
00377     postBuild();
00378   }
00379   
00380   bufRef->release();
00381 }
00382 
00383 
00384 //______________________________________________________________________________
00385 void BU::actionPerformed(xdata::Event& e)
00386 {
00387   gui_->monInfoSpace()->lock();
00388   if (e.type()=="urn:xdata-event:ItemGroupRetrieveEvent") {
00389     mode_=(0==PlaybackRawDataProvider::instance())?"RANDOM":"PLAYBACK";
00390     if (0!=i2oPool_) memUsedInMB_=i2oPool_->getMemoryUsage().getUsed()*9.53674e-07;
00391     else             memUsedInMB_=0.0;
00392   }
00393   else if (e.type()=="ItemChangedEvent") {
00394     string item=dynamic_cast<xdata::ItemChangedEvent&>(e).itemName();
00395     if (item=="crc") BUEvent::setComputeCrc(crc_.value_);
00396   }
00397   gui_->monInfoSpace()->unlock();
00398 }
00399 
00400 
00401 //______________________________________________________________________________
00402 void BU::webPageRequest(xgi::Input *in,xgi::Output *out)
00403   throw (xgi::exception::Exception)
00404 {
00405   string name=in->getenv("PATH_INFO");
00406   if (name.empty()) name="defaultWebPage";
00407   static_cast<xgi::MethodSignature*>(gui_->getMethod(name))->invoke(in,out);
00408 }
00409 
00410 
00411 //______________________________________________________________________________
00412 void BU::customWebPage(xgi::Input*in,xgi::Output*out)
00413   throw (xgi::exception::Exception)
00414 {
00415   *out<<"<html></html>"<<endl;
00416 }
00417 
00418 
00419 //______________________________________________________________________________
00420 void BU::startBuildingWorkLoop() throw (evf::Exception)
00421 {
00422   try {
00423     LOG4CPLUS_INFO(log_,"Start 'building' workloop");
00424     wlBuilding_=
00425       toolbox::task::getWorkLoopFactory()->getWorkLoop(sourceId_+
00426                                                        "Building",
00427                                                        "waiting");
00428     if (!wlBuilding_->isActive()) wlBuilding_->activate();
00429     asBuilding_=toolbox::task::bind(this,&BU::building,sourceId_+"Building");
00430     wlBuilding_->submit(asBuilding_);
00431     isBuilding_=true;
00432   }
00433   catch (xcept::Exception& e) {
00434     string msg = "Failed to start workloop 'building'.";
00435     XCEPT_RETHROW(evf::Exception,msg,e);
00436   }
00437 }
00438 
00439 
00440 //______________________________________________________________________________
00441 bool BU::building(toolbox::task::WorkLoop* wl)
00442 {
00443   waitBuild();
00444   lock();
00445   unsigned int buResourceId=freeIds_.front(); freeIds_.pop();
00446   unlock();
00447   
00448   if (buResourceId>=(uint32_t)events_.size()) {
00449     LOG4CPLUS_INFO(log_,"shutdown 'building' workloop.");
00450     isBuilding_=false;
00451     return false;
00452   }
00453   
00454   if (!isHalting_) {
00455     BUEvent* evt=events_[buResourceId];
00456     if(generateEvent(evt)) {
00457       lock();
00458       nbEventsBuilt_++;
00459       builtIds_.push(buResourceId);
00460       unlock();
00461       
00462       postSend();
00463     }
00464     else {
00465       LOG4CPLUS_INFO(log_,"building:received null post");
00466       lock();
00467       unsigned int saveBUResourceId = buResourceId;
00468       //buResourceId = freeIds_.front(); freeIds_.pop();
00469       freeIds_.push(saveBUResourceId);
00470       unlock();
00471       isBuilding_=false;
00472       return false;
00473     }
00474   }
00475   return true;
00476 }
00477 
00478 
00479 //______________________________________________________________________________
00480 void BU::startSendingWorkLoop() throw (evf::Exception)
00481 {
00482   try {
00483     LOG4CPLUS_INFO(log_,"Start 'sending' workloop");
00484     wlSending_=toolbox::task::getWorkLoopFactory()->getWorkLoop(sourceId_+
00485                                                                 "Sending",
00486                                                                 "waiting");
00487     if (!wlSending_->isActive()) wlSending_->activate();
00488 
00489     asSending_=toolbox::task::bind(this,&BU::sending,sourceId_+"Sending");
00490     wlSending_->submit(asSending_);
00491     isSending_=true;
00492   }
00493   catch (xcept::Exception& e) {
00494     string msg = "Failed to start workloop 'sending'.";
00495     XCEPT_RETHROW(evf::Exception,msg,e);
00496   }
00497 }
00498 
00499 
00500 //______________________________________________________________________________
00501 bool BU::sending(toolbox::task::WorkLoop* wl)
00502 {
00503   waitSend();
00504   lock();
00505   unsigned int buResourceId=builtIds_.front(); builtIds_.pop();
00506   unlock();
00507   
00508   if (buResourceId>=(uint32_t)events_.size()) {
00509     LOG4CPLUS_INFO(log_,"shutdown 'sending' workloop.");
00510     isSending_=false;
00511     return false;
00512   }
00513 
00514   if (!isHalting_) {
00515     waitRqst();
00516     lock();
00517     unsigned int fuResourceId=rqstIds_.front(); rqstIds_.pop();
00518     unlock();
00519     
00520     BUEvent* evt=events_[buResourceId];
00521     toolbox::mem::Reference* msg=createMsgChain(evt,fuResourceId);
00522     
00523     lock();
00524     sumOfSquares_+=(uint64_t)evt->evtSize()*(uint64_t)evt->evtSize();
00525     sumOfSizes_  +=evt->evtSize();
00526     nbEventsInBU_--;
00527     nbEventsSent_++;
00528     sentIds_.insert(buResourceId);
00529     unlock();
00530     
00531     buAppContext_->postFrame(msg,buAppDesc_,fuAppDesc_);  
00532   }
00533   
00534   return true;
00535 }
00536 
00537 
00538 //______________________________________________________________________________
00539 void BU::startMonitoringWorkLoop() throw (evf::Exception)
00540 {
00541   struct timezone timezone;
00542   gettimeofday(&monStartTime_,&timezone);
00543   
00544   try {
00545     LOG4CPLUS_INFO(log_,"Start 'monitoring' workloop");
00546     wlMonitoring_=
00547       toolbox::task::getWorkLoopFactory()->getWorkLoop(sourceId_+
00548                                                        "Monitoring",
00549                                                        "waiting");
00550     if (!wlMonitoring_->isActive()) wlMonitoring_->activate();
00551     asMonitoring_=toolbox::task::bind(this,&BU::monitoring,sourceId_+"Monitoring");
00552     wlMonitoring_->submit(asMonitoring_);
00553   }
00554   catch (xcept::Exception& e) {
00555     string msg = "Failed to start workloop 'monitoring'.";
00556     XCEPT_RETHROW(evf::Exception,msg,e);
00557   }
00558 }
00559 
00560 
00561 //______________________________________________________________________________
00562 bool BU::monitoring(toolbox::task::WorkLoop* wl)
00563 {
00564   struct timeval  monEndTime;
00565   struct timezone timezone;
00566   
00567   gettimeofday(&monEndTime,&timezone);
00568   
00569   lock();
00570   unsigned int monN           =nbEventsBuilt_.value_;
00571   uint64_t     monSumOfSquares=sumOfSquares_;
00572   unsigned int monSumOfSizes  =sumOfSizes_;
00573   uint64_t     deltaSumOfSquares;
00574   unlock();
00575   
00576   gui_->monInfoSpace()->lock();
00577   
00578   deltaT_.value_=deltaT(&monStartTime_,&monEndTime);
00579   monStartTime_=monEndTime;
00580   
00581   deltaN_.value_=monN-monLastN_;
00582   monLastN_=monN;
00583 
00584   deltaSumOfSquares=monSumOfSquares-monLastSumOfSquares_;
00585   deltaSumOfSquares_.value_=(double)deltaSumOfSquares;
00586   monLastSumOfSquares_=monSumOfSquares;
00587   
00588   deltaSumOfSizes_.value_=monSumOfSizes-monLastSumOfSizes_;
00589   monLastSumOfSizes_=monSumOfSizes;
00590   
00591   if (deltaT_.value_!=0) {
00592     throughput_=deltaSumOfSizes_.value_/deltaT_.value_;
00593     rate_      =deltaN_.value_/deltaT_.value_;
00594   }
00595   else {
00596     throughput_=0.0;
00597     rate_      =0.0;
00598   }
00599   
00600   double meanOfSquares,mean,squareOfMean,variance;
00601   
00602   if(deltaN_.value_!=0) {
00603     meanOfSquares=deltaSumOfSquares_.value_/((double)(deltaN_.value_));
00604     mean=((double)(deltaSumOfSizes_.value_))/((double)(deltaN_.value_));
00605     squareOfMean=mean*mean;
00606     variance=meanOfSquares-squareOfMean;
00607     average_=deltaSumOfSizes_.value_/deltaN_.value_;
00608     rms_    =std::sqrt(variance);
00609   }
00610   else {
00611     average_=0.0;
00612     rms_    =0.0;
00613   }
00614 
00615   gui_->monInfoSpace()->unlock();
00616   
00617   ::sleep(monSleepSec_.value_);
00618 
00619   return true;
00620 }
00621     
00622 
00623 
00625 // implementation of private member functions
00627 
00628 //______________________________________________________________________________
00629 void BU::exportParameters()
00630 {
00631   if (0==gui_) {
00632     LOG4CPLUS_ERROR(log_,"No GUI, can't export parameters");
00633     return;
00634   }
00635   
00636   gui_->addMonitorParam("url",                &url_);
00637   gui_->addMonitorParam("class",              &class_);
00638   gui_->addMonitorParam("instance",           &instance_);
00639   gui_->addMonitorParam("hostname",           &hostname_);
00640   gui_->addMonitorParam("runNumber",          &runNumber_);
00641   gui_->addMonitorParam("stateName",          fsm_.stateName());
00642   gui_->addMonitorParam("memUsedInMB",        &memUsedInMB_);
00643   gui_->addMonitorParam("deltaT",             &deltaT_);
00644   gui_->addMonitorParam("deltaN",             &deltaN_);
00645   gui_->addMonitorParam("deltaSumOfSquares",  &deltaSumOfSquares_);
00646   gui_->addMonitorParam("deltaSumOfSizes",    &deltaSumOfSizes_);
00647   gui_->addMonitorParam("throughput",         &throughput_);
00648   gui_->addMonitorParam("average",            &average_);
00649   gui_->addMonitorParam("rate",               &rate_);
00650   gui_->addMonitorParam("rms",                &rms_);
00651 
00652   gui_->addMonitorCounter("nbEvtsInBU",       &nbEventsInBU_);
00653   gui_->addMonitorCounter("nbEvtsRequested",  &nbEventsRequested_);
00654   gui_->addMonitorCounter("nbEvtsBuilt",      &nbEventsBuilt_);
00655   gui_->addMonitorCounter("nbEvtsSent",       &nbEventsSent_);
00656   gui_->addMonitorCounter("nbEvtsDiscarded",  &nbEventsDiscarded_);
00657 
00658   gui_->addStandardParam("mode",              &mode_);
00659   gui_->addStandardParam("replay",            &replay_);
00660   gui_->addStandardParam("overwriteEvtId",    &overwriteEvtId_);
00661   gui_->addStandardParam("overwriteLsId",     &overwriteLsId_);
00662   gui_->addStandardParam("fakeLsUpdateSecs",   &fakeLsUpdateSecs_);
00663   gui_->addStandardParam("crc",               &crc_);
00664   gui_->addStandardParam("firstEvent",        &firstEvent_);
00665   gui_->addStandardParam("queueSize",         &queueSize_);
00666   gui_->addStandardParam("eventBufferSize",   &eventBufferSize_);
00667   gui_->addStandardParam("msgBufferSize",     &msgBufferSize_);
00668   gui_->addStandardParam("fedSizeMax",        &fedSizeMax_);
00669   gui_->addStandardParam("fedSizeMean",       &fedSizeMean_);
00670   gui_->addStandardParam("fedSizeWidth",      &fedSizeWidth_);
00671   gui_->addStandardParam("useFixedFedSize",   &useFixedFedSize_);
00672   gui_->addStandardParam("monSleepSec",       &monSleepSec_);
00673   gui_->addStandardParam("rcmsStateListener",     fsm_.rcmsStateListener());
00674   gui_->addStandardParam("foundRcmsStateListener",fsm_.foundRcmsStateListener());
00675 
00676   
00677   gui_->exportParameters();
00678 
00679   gui_->addItemChangedListener("crc",this);
00680   
00681 }
00682 
00683 
00684 //______________________________________________________________________________
00685 void BU::reset()
00686 {
00687   gui_->resetCounters();
00688   
00689   deltaT_             =0.0;
00690   deltaN_             =  0;
00691   deltaSumOfSquares_  =  0;
00692   deltaSumOfSizes_    =  0;
00693   
00694   throughput_         =0.0;
00695   average_            =  0;
00696   rate_               =  0;
00697   rms_                =  0;
00698 
00699   monLastN_           =  0;
00700   monLastSumOfSquares_=  0;
00701   monLastSumOfSizes_  =  0;
00702   
00703   while (events_.size()) {
00704     delete events_.back();
00705     events_.pop_back();
00706   }
00707   
00708   while (!rqstIds_.empty())  rqstIds_.pop();
00709   while (!freeIds_.empty())  freeIds_.pop();
00710   while (!builtIds_.empty()) builtIds_.pop();
00711   sentIds_.clear();
00712  
00713   sem_init(&lock_,0,1);
00714   sem_init(&buildSem_,0,queueSize_);
00715   sem_init(&sendSem_,0,0);
00716   sem_init(&rqstSem_,0,0);
00717   
00718   for (unsigned int i=0;i<queueSize_;i++) {
00719     events_.push_back(new BUEvent(i,eventBufferSize_));
00720     freeIds_.push(i);
00721   }
00722   validFedIds_.clear();
00723   fakeLs_=0;
00724 }
00725 
00726 //______________________________________________________________________________
00727 double BU::deltaT(const struct timeval *start,const struct timeval *end)
00728 {
00729   unsigned int  sec;
00730   unsigned int  usec;
00731   
00732   sec = end->tv_sec - start->tv_sec;
00733   
00734   if(end->tv_usec > start->tv_usec) {
00735     usec = end->tv_usec - start->tv_usec;
00736   }
00737   else {
00738     sec--;
00739     usec = 1000000 - ((unsigned int )(start->tv_usec - end->tv_usec));
00740   }
00741   
00742   return ((double)sec) + ((double)usec) / 1000000.0;
00743 }
00744 
00745 
00746 //______________________________________________________________________________
00747 bool BU::generateEvent(BUEvent* evt)
00748 {
00749   // replay?
00750   if (replay_.value_&&nbEventsBuilt_>=(uint32_t)events_.size()) 
00751     {
00752       if (0!=PlaybackRawDataProvider::instance())
00753         PlaybackRawDataProvider::instance()->setFreeToEof();
00754       return true;
00755     }  
00756   // PLAYBACK mode
00757   if (0!=PlaybackRawDataProvider::instance()) {
00758     
00759     unsigned int runNumber,evtNumber;
00760 
00761     FEDRawDataCollection* event=
00762       PlaybackRawDataProvider::instance()->getFEDRawData(runNumber,evtNumber);
00763     if(event == 0) return false;
00764     evt->initialize(evtNumber);
00765     
00766     for (unsigned int i=0;i<validFedIds_.size();i++) {
00767       unsigned int   fedId  =validFedIds_[i];
00768       unsigned int   fedSize=event->FEDData(fedId).size();
00769       unsigned char* fedAddr=event->FEDData(fedId).data();
00770       if (overwriteEvtId_.value_ && fedAddr != 0) {
00771         fedh_t *fedHeader=(fedh_t*)fedAddr;
00772         fedHeader->eventid=(fedHeader->eventid&0xFF000000)+(evtNumber&0x00FFFFFF);
00773       }
00774       if (fedSize>0) evt->writeFed(fedId,fedAddr,fedSize);
00775     }
00776     delete event;
00777   }
00778   // RANDOM mode
00779   else {
00780     unsigned int evtNumber=(firstEvent_+evtNumber_++)%0x1000000;
00781     evt->initialize(evtNumber);
00782     unsigned int fedSizeMin=fedHeaderSize_+fedTrailerSize_;
00783     for (unsigned int i=0;i<validFedIds_.size();i++) {
00784       unsigned int fedId(validFedIds_[i]);
00785       unsigned int fedSize(fedSizeMean_);
00786       if (!useFixedFedSize_) {
00787         double logFedSize=CLHEP::RandGauss::shoot(gaussianMean_,gaussianWidth_);
00788         fedSize=(unsigned int)(std::exp(logFedSize));
00789         if (fedSize<fedSizeMin)  fedSize=fedSizeMin;
00790         if (fedSize>fedSizeMax_) fedSize=fedSizeMax_;
00791         fedSize-=fedSize%8;
00792       }
00793       
00794       evt->writeFed(fedId,0,fedSize);
00795       evt->writeFedHeader(i);
00796       evt->writeFedTrailer(i);
00797     }
00798     
00799   }
00800   return true;
00801 }
00802 
00803 
00804 //______________________________________________________________________________
00805 toolbox::mem::Reference *BU::createMsgChain(BUEvent* evt,
00806                                             unsigned int fuResourceId)
00807 {
00808   unsigned int msgHeaderSize =sizeof(I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME);
00809   unsigned int msgPayloadSize=msgBufferSize_-msgHeaderSize;
00810 
00811   if((msgPayloadSize%4)!=0) LOG4CPLUS_ERROR(log_,"Invalid Payload Size.");
00812  
00813   /*Overwrite lumisection value stored in the event*/
00814   if (overwriteLsId_.value_) {
00815     //getting new time and increase LS if past 23 sec
00816     struct timezone tz;
00817     if (!fakeLs_) {
00818       fakeLs_++;
00819       gettimeofday(&lastLsUpdate_,&tz);
00820     }
00821     else {
00822       timeval newLsUpdate;
00823       gettimeofday(&newLsUpdate,&tz);
00824       if ((unsigned long)1000000*newLsUpdate.tv_sec+newLsUpdate.tv_usec 
00825           - (unsigned  long)1000000*lastLsUpdate_.tv_sec+lastLsUpdate_.tv_usec
00826           >= fakeLsUpdateSecs_.value_*1000000) 
00827       {
00828         fakeLs_++;
00829         lastLsUpdate_=newLsUpdate;
00830       }
00831     }
00832 
00833     int gtpFedPos_=-1;
00834     int egtpFedPos_=-1;
00835     for (size_t k=0;k<validFedIds_.size();k++) {
00836       if (evt->fedId(k)==FEDNumbering::MINTriggerGTPFEDID) {
00837       //insert ls value into gtp fed
00838         unsigned char * fgtpAddr = evt->fedAddr(k);
00839         unsigned int fgtpSize = evt->fedSize(k);
00840         if (fgtpAddr && fgtpSize) {
00841           gtpFedPos_=(int)k;
00842           evtn::evm_board_sense(fgtpAddr,fgtpSize);
00843           *((unsigned short*)fgtpAddr
00844               +sizeof(fedh_t)/sizeof(unsigned short)
00845               + (evtn::EVM_GTFE_BLOCK*2 + evtn::EVM_TCS_LSBLNR_OFFSET)*evtn::SLINK_HALFWORD_SIZE /sizeof(unsigned short)
00846            ) = (unsigned short)fakeLs_-1;
00847         }
00848       }
00849       if (evt->fedId(k)==FEDNumbering::MINTriggerEGTPFEDID) {
00850         //insert orbit value into gtpe fed
00851         unsigned char * fegtpAddr = evt->fedAddr(egtpFedPos_);
00852         unsigned int fegtpSize = evt->fedSize(egtpFedPos_);
00853         if (fegtpAddr && fegtpSize) {
00854           egtpFedPos_=(int)k;
00855           *(  (unsigned int*)fegtpAddr + evtn::GTPE_ORBTNR_OFFSET * evtn::SLINK_HALFWORD_SIZE/sizeof(unsigned int)
00856            ) = (unsigned int)(fakeLs_-1)*0x00100000;
00857         }
00858       }
00859     }
00860     if (gtpFedPos_<0) LOG4CPLUS_ERROR(log_,"Unable to find GTP FED in event!");
00861     if (egtpFedPos_<0 && gtpFedPos_<0) LOG4CPLUS_ERROR(log_,"Unable to find GTP or GTPE FED in event!");
00862   }
00863 
00864   toolbox::mem::Reference *head  =0;
00865   toolbox::mem::Reference *tail  =0;
00866   toolbox::mem::Reference *bufRef=0;
00867   
00868   I2O_MESSAGE_FRAME                  *stdMsg=0;
00869   I2O_PRIVATE_MESSAGE_FRAME          *pvtMsg=0;
00870   I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME *block =0;
00871   
00872   unsigned int iFed            =0;
00873   unsigned int nSuperFrag      =64;
00874   unsigned int nFedPerSuperFrag=validFedIds_.size()/nSuperFrag;
00875   unsigned int nBigSuperFrags  =validFedIds_.size()%nSuperFrag;
00876   
00877   if (evt->nFed()<nSuperFrag) {
00878     nSuperFrag=evt->nFed();
00879     nFedPerSuperFrag=1;
00880     nBigSuperFrags=0;
00881   }
00882   else
00883     {
00884       nFedPerSuperFrag=evt->nFed()/nSuperFrag;
00885       nBigSuperFrags  =evt->nFed()%nSuperFrag;
00886     }
00887   // loop over all super fragments
00888   for (unsigned int iSuperFrag=0;iSuperFrag<nSuperFrag;iSuperFrag++) {
00889     
00890     // compute index of last fed in this super fragment
00891     unsigned int nFed=iFed+nFedPerSuperFrag;
00892     if (iSuperFrag<nBigSuperFrags) ++nFed;
00893     
00894     // compute number of blocks in this super fragment
00895     unsigned int nBlock  =0;
00896     unsigned int curbSize=frlHeaderSize_;
00897     unsigned int totSize =curbSize;
00898     for (unsigned int i=iFed;i<nFed;i++) {
00899       curbSize+=evt->fedSize(i);
00900       totSize+=evt->fedSize(i);
00901       if (curbSize>msgPayloadSize) {
00902         curbSize+=frlHeaderSize_*(curbSize/msgPayloadSize);
00903         if(curbSize%msgPayloadSize)totSize+=frlHeaderSize_*(curbSize/msgPayloadSize);
00904         else totSize+=frlHeaderSize_*((curbSize/msgPayloadSize)-1);
00905         curbSize=curbSize%msgPayloadSize;
00906       }
00907     }
00908     nBlock=totSize/msgPayloadSize+(totSize%msgPayloadSize>0 ? 1 : 0);
00909     
00910     
00911     // loop over all blocks (msgs) in the current super fragment
00912     unsigned int   remainder     =0;
00913     bool           fedTrailerLeft=false;
00914     bool           last          =false;
00915     bool           warning       =false;
00916     unsigned char *startOfPayload=0;
00917     U32            payload(0);
00918     
00919     for(unsigned int iBlock=0;iBlock<nBlock;iBlock++) {
00920       
00921       // If last block and its partial (there can be only 0 or 1 partial)
00922       payload=msgPayloadSize;
00923       
00924       // Allocate memory for a fragment block / message
00925       try {
00926         bufRef=toolbox::mem::getMemoryPoolFactory()->getFrame(i2oPool_,
00927                                                               msgBufferSize_);
00928       }
00929       catch(xcept::Exception &e) {
00930         LOG4CPLUS_FATAL(log_,"xdaq::frameAlloc failed");
00931       }
00932       
00933       // Fill in the fields of the fragment block / message
00934       stdMsg=(I2O_MESSAGE_FRAME*)bufRef->getDataLocation();
00935       pvtMsg=(I2O_PRIVATE_MESSAGE_FRAME*)stdMsg;
00936       block =(I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME*)stdMsg;
00937       
00938       pvtMsg->XFunctionCode   =I2O_FU_TAKE;
00939       pvtMsg->OrganizationID  =XDAQ_ORGANIZATION_ID;
00940       
00941       stdMsg->MessageSize     =(msgHeaderSize + payload) >> 2;
00942       stdMsg->Function        =I2O_PRIVATE_MESSAGE;
00943       stdMsg->VersionOffset   =0;
00944       stdMsg->MsgFlags        =0;
00945       stdMsg->InitiatorAddress=i2o::utils::getAddressMap()->getTid(buAppDesc_);
00946       stdMsg->TargetAddress   =i2o::utils::getAddressMap()->getTid(fuAppDesc_);
00947       
00948       block->buResourceId           =evt->buResourceId();
00949       block->fuTransactionId        =fuResourceId;
00950       block->blockNb                =iBlock;
00951       block->nbBlocksInSuperFragment=nBlock;
00952       block->superFragmentNb        =iSuperFrag;
00953       block->nbSuperFragmentsInEvent=nSuperFrag;
00954       block->eventNumber            =evt->evtNumber();
00955       
00956       // Fill in payload 
00957       startOfPayload   =(unsigned char*)block+msgHeaderSize;
00958       frlh_t* frlHeader=(frlh_t*)startOfPayload;
00959       frlHeader->trigno=evt->evtNumber();
00960       frlHeader->segno =iBlock;
00961       
00962       unsigned char *startOfFedBlocks=startOfPayload+frlHeaderSize_;
00963       payload              -=frlHeaderSize_;
00964       frlHeader->segsize    =payload;
00965       unsigned int leftspace=payload;
00966       
00967       // a fed trailer was left over from the previous block
00968       if(fedTrailerLeft) {
00969         memcpy(startOfFedBlocks,
00970                evt->fedAddr(iFed)+evt->fedSize(iFed)-fedTrailerSize_,
00971                fedTrailerSize_);
00972         
00973         startOfFedBlocks+=fedTrailerSize_;
00974         leftspace       -=fedTrailerSize_;
00975         remainder        =0;
00976         fedTrailerLeft   =false;
00977         
00978         // if this is the last fed, adjust block (msg) size and set last=true
00979         if((iFed==nFed-1) && !last) {
00980           frlHeader->segsize-=leftspace;
00981           int msgSize=stdMsg->MessageSize << 2;
00982           msgSize   -=leftspace;
00983           bufRef->setDataSize(msgSize);
00984           stdMsg->MessageSize = msgSize >> 2;
00985           frlHeader->segsize=frlHeader->segsize | FRL_LAST_SEGM;
00986           last=true;
00987         }
00988         
00989         // !! increment iFed !!
00990         iFed++;
00991       }
00992       
00996       if (remainder>0) {
00997         
00998         // the remaining fed fits entirely into the new block
00999         if(payload>=remainder) {
01000           memcpy(startOfFedBlocks,
01001                  evt->fedAddr(iFed)+evt->fedSize(iFed)-remainder,
01002                  remainder);
01003           
01004           startOfFedBlocks+=remainder;
01005           leftspace       -=remainder;
01006           
01007           // if this is the last fed in the superfragment, earmark it
01008           if(iFed==nFed-1) {
01009             frlHeader->segsize-=leftspace;
01010             int msgSize=stdMsg->MessageSize << 2;
01011             msgSize   -=leftspace;
01012             bufRef->setDataSize(msgSize);
01013             stdMsg->MessageSize = msgSize >> 2;
01014             frlHeader->segsize=frlHeader->segsize | FRL_LAST_SEGM;
01015             last=true;
01016           }
01017           
01018           // !! increment iFed !!
01019           iFed++;
01020           
01021           // start new fed -> set remainder to 0!
01022           remainder=0;
01023         }
01024         // the remaining payload fits, but not the fed trailer
01025         else if (payload>=(remainder-fedTrailerSize_)) {
01026           memcpy(startOfFedBlocks,
01027                  evt->fedAddr(iFed)+evt->fedSize(iFed)-remainder,
01028                  remainder-fedTrailerSize_);
01029           
01030           frlHeader->segsize=remainder-fedTrailerSize_;
01031           fedTrailerLeft    =true;
01032           leftspace        -=(remainder-fedTrailerSize_);
01033           remainder         =fedTrailerSize_;
01034         }
01035         // the remaining payload fits only partially, fill whole block
01036         else {
01037           memcpy(startOfFedBlocks,
01038                  evt->fedAddr(iFed)+evt->fedSize(iFed)-remainder,payload);
01039           remainder-=payload;
01040           leftspace =0;
01041         }
01042       }
01043       
01047       if(remainder==0) {
01048         
01049         // loop on feds
01050         while(iFed<nFed) {
01051           
01052           // if the next header does not fit, jump to following block
01053           if((int)leftspace<fedHeaderSize_) {
01054             frlHeader->segsize-=leftspace;
01055             break;
01056           }
01057           
01058           memcpy(startOfFedBlocks,evt->fedAddr(iFed),fedHeaderSize_);
01059           
01060           leftspace       -=fedHeaderSize_;
01061           startOfFedBlocks+=fedHeaderSize_;
01062           
01063           // fed fits with its trailer
01064           if(evt->fedSize(iFed)-fedHeaderSize_<=leftspace) {
01065             memcpy(startOfFedBlocks,
01066                    evt->fedAddr(iFed)+fedHeaderSize_,
01067                    evt->fedSize(iFed)-fedHeaderSize_);
01068             
01069             leftspace       -=(evt->fedSize(iFed)-fedHeaderSize_);
01070             startOfFedBlocks+=(evt->fedSize(iFed)-fedHeaderSize_);
01071           }
01072           // fed payload fits only without fed trailer
01073           else if(evt->fedSize(iFed)-fedHeaderSize_-fedTrailerSize_<=leftspace) {
01074             memcpy(startOfFedBlocks,
01075                    evt->fedAddr(iFed)+fedHeaderSize_,
01076                    evt->fedSize(iFed)-fedHeaderSize_-fedTrailerSize_);
01077             
01078             leftspace         -=(evt->fedSize(iFed)-fedHeaderSize_-fedTrailerSize_);
01079             frlHeader->segsize-=leftspace;
01080             fedTrailerLeft     =true;
01081             remainder          =fedTrailerSize_;
01082             
01083             break;
01084           }
01085           // fed payload fits only partially
01086           else {
01087             memcpy(startOfFedBlocks,evt->fedAddr(iFed)+fedHeaderSize_,leftspace);
01088             remainder=evt->fedSize(iFed)-fedHeaderSize_-leftspace;
01089             leftspace=0;
01090             
01091             break;
01092           }
01093           
01094           // !! increase iFed !!
01095           iFed++;
01096           
01097         } // while (iFed<fedN_)
01098         
01099         // earmark the last block
01100         if (iFed==nFed && remainder==0 && !last) {
01101           frlHeader->segsize-=leftspace;
01102           int msgSize=stdMsg->MessageSize << 2;
01103           msgSize   -=leftspace;
01104           bufRef->setDataSize(msgSize);
01105           stdMsg->MessageSize=msgSize >> 2;
01106           frlHeader->segsize =frlHeader->segsize | FRL_LAST_SEGM;
01107           last=true;
01108         }
01109         
01110       } // if (remainder==0)
01111       
01112       if(iSuperFrag==0&&iBlock==0) { // This is the first fragment block / message
01113         head=bufRef;
01114         tail=bufRef;
01115       }
01116       else {
01117         tail->setNextReference(bufRef);
01118         tail=bufRef;
01119       }
01120       
01121       if((iBlock==nBlock-1) && remainder!=0) {
01122         nBlock++;
01123         warning=true;
01124       }
01125       
01126     } // for (iBlock)
01127     
01128     // fix case where block estimate was wrong
01129     if(warning) {
01130       toolbox::mem::Reference* next=head;
01131       do {
01132         block =(I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME*)next->getDataLocation();
01133         if (block->superFragmentNb==iSuperFrag)
01134           block->nbBlocksInSuperFragment=nBlock;                
01135       } while((next=next->getNextReference()));
01136     }
01137   
01138   } // iSuperFrag < nSuperFrag
01139   
01140   return head; // return the top of the chain
01141 }
01142 
01143 //______________________________________________________________________________
01144 void BU::dumpFrame(unsigned char* data,unsigned int len)
01145 {
01146   char left1[20];
01147   char left2[20];
01148   char right1[20];
01149   char right2[20];
01150 
01151   printf("Byte  0  1  2  3  4  5  6  7\n");
01152   
01153   int c(0);
01154   int pos(0);
01155   
01156   for (unsigned int i=0;i<(len/8);i++) {
01157     int rpos(0);
01158     int off(3);
01159     for (pos=0;pos<12;pos+=3) {
01160       sprintf(&left1[pos],"%2.2x ",
01161               ((unsigned char*)data)[c+off]);
01162       sprintf(&right1[rpos],"%1c",
01163               ((data[c+off] > 32)&&(data[c+off] < 127)) ? data[c+off] : '.');
01164       sprintf (&left2[pos],"%2.2x ",
01165                ((unsigned char*)data)[c+off+4]);
01166       sprintf (&right2[rpos],"%1c",
01167                ((data[c+off+4] > 32)&&(data[c+off+4]<127)) ? data[c+off+4] : '.');
01168       rpos++;
01169       off--;
01170     }
01171     c+=8;
01172     
01173     printf ("%4d: %s%s ||  %s%s  %p\n",
01174             c-8, left1, left2, right1, right2, &data[c-8]);
01175   }
01176   
01177   fflush(stdout);       
01178 }
01179 
01180 
01182 // xdaq instantiator implementation macro
01184 
01185 XDAQ_INSTANTIATOR_IMPL(BU)