00001
00002
00003
00004
00005
00006
00008
00009
00010 #include "EventFilter/ResourceBroker/interface/FUResourceTable.h"
00011 #include "FWCore/Utilities/interface/CRC16.h"
00012 #include "EvffedFillerRB.h"
00013
00014 #include "toolbox/task/WorkLoopFactory.h"
00015 #include "interface/evb/i2oEVBMsgs.h"
00016 #include "xcept/tools.h"
00017
00018
00019 #include <fstream>
00020 #include <sstream>
00021 #include <iomanip>
00022 #include <unistd.h>
00023
00024
00025 using namespace evf;
00026 using namespace std;
00027
00028
00030
00032
00033
00034 FUResourceTable::FUResourceTable(bool segmentationMode,
00035 UInt_t nbRawCells,
00036 UInt_t nbRecoCells,
00037 UInt_t nbDqmCells,
00038 UInt_t rawCellSize,
00039 UInt_t recoCellSize,
00040 UInt_t dqmCellSize,
00041 BUProxy *bu,
00042 SMProxy *sm,
00043 log4cplus::Logger logger,
00044 unsigned int timeout,
00045 EvffedFillerRB *frb,
00046 xdaq::Application*app)
00047 throw (evf::Exception)
00048 : bu_(bu)
00049 , sm_(sm)
00050 , log_(logger)
00051 , wlSendData_(0)
00052 , asSendData_(0)
00053 , wlSendDqm_(0)
00054 , asSendDqm_(0)
00055 , wlDiscard_(0)
00056 , asDiscard_(0)
00057 , shmBuffer_(0)
00058 , nbDqmCells_(nbDqmCells)
00059 , nbRawCells_(nbRawCells)
00060 , nbRecoCells_(nbRecoCells)
00061 , acceptSMDataDiscard_(0)
00062 , acceptSMDqmDiscard_(0)
00063 , doCrcCheck_(1)
00064 , shutdownTimeout_(timeout)
00065 , nbPending_(0)
00066 , nbClientsToShutDown_(0)
00067 , isReadyToShutDown_(true)
00068 , isActive_(false)
00069 , isHalting_(false)
00070 , isStopping_(false)
00071 , runNumber_(0xffffffff)
00072 , lock_(toolbox::BSem::FULL)
00073 , frb_(frb)
00074 , app_(app)
00075 {
00076 initialize(segmentationMode,
00077 nbRawCells,nbRecoCells,nbDqmCells,
00078 rawCellSize,recoCellSize,dqmCellSize);
00079 }
00080
00081
00082
00083 FUResourceTable::~FUResourceTable()
00084 {
00085 clear();
00086 if(wlSendData_){
00087 wlSendData_->cancel();
00088 toolbox::task::getWorkLoopFactory()->removeWorkLoop("SendData","waiting");
00089 }
00090 if(wlSendDqm_){
00091 wlSendDqm_->cancel();
00092 toolbox::task::getWorkLoopFactory()->removeWorkLoop("SendDqm","waiting");
00093 }
00094 if(wlDiscard_){
00095 wlDiscard_->cancel();
00096 toolbox::task::getWorkLoopFactory()->removeWorkLoop("Discard","waiting");
00097 }
00098 shmdt(shmBuffer_);
00099 if (FUShmBuffer::releaseSharedMemory())
00100 LOG4CPLUS_INFO(log_,"SHARED MEMORY SUCCESSFULLY RELEASED.");
00101 if (0!=acceptSMDataDiscard_) delete [] acceptSMDataDiscard_;
00102 if (0!= acceptSMDqmDiscard_) delete [] acceptSMDqmDiscard_;
00103 }
00104
00105
00107
00109
00110
00111 void FUResourceTable::initialize(bool segmentationMode,
00112 UInt_t nbRawCells,
00113 UInt_t nbRecoCells,
00114 UInt_t nbDqmCells,
00115 UInt_t rawCellSize,
00116 UInt_t recoCellSize,
00117 UInt_t dqmCellSize)
00118 throw (evf::Exception)
00119 {
00120 clear();
00121
00122 shmBuffer_=FUShmBuffer::createShmBuffer(segmentationMode,
00123 nbRawCells,nbRecoCells,nbDqmCells,
00124 rawCellSize,recoCellSize,dqmCellSize);
00125 if (0==shmBuffer_) {
00126 string msg = "CREATION OF SHARED MEMORY SEGMENT FAILED!";
00127 LOG4CPLUS_FATAL(log_,msg);
00128 XCEPT_RAISE(evf::Exception,msg);
00129 }
00130
00131 for (UInt_t i=0;i<nbRawCells_;i++) {
00132 resources_.push_back(new FUResource(i,log_,frb_,app_));
00133 freeResourceIds_.push(i);
00134 }
00135
00136 acceptSMDataDiscard_ = new bool[nbRecoCells];
00137 acceptSMDqmDiscard_ = new int[nbDqmCells];
00138
00139 resetCounters();
00140 }
00141
00142
00143
00144 void FUResourceTable::startSendDataWorkLoop() throw (evf::Exception)
00145 {
00146 try {
00147 wlSendData_=
00148 toolbox::task::getWorkLoopFactory()->getWorkLoop("SendData","waiting");
00149 if (!wlSendData_->isActive()) wlSendData_->activate();
00150 asSendData_=toolbox::task::bind(this,&FUResourceTable::sendData,"SendData");
00151 wlSendData_->submit(asSendData_);
00152 }
00153 catch (xcept::Exception& e) {
00154 string msg = "Failed to start workloop 'SendData'.";
00155 XCEPT_RETHROW(evf::Exception,msg,e);
00156 }
00157 }
00158
00159
00160
00161 bool FUResourceTable::sendData(toolbox::task::WorkLoop* )
00162 {
00163 bool reschedule=true;
00164
00165 FUShmRecoCell* cell=shmBuffer_->recoCellToRead();
00166
00167 if (0==cell->eventSize()) {
00168 LOG4CPLUS_INFO(log_,"Don't reschedule sendData workloop.");
00169 UInt_t cellIndex=cell->index();
00170 shmBuffer_->finishReadingRecoCell(cell);
00171 shmBuffer_->discardRecoCell(cellIndex);
00172 reschedule=false;
00173 }
00174 else if (isHalting_) {
00175 LOG4CPLUS_INFO(log_,"sendData: isHalting, discard recoCell.");
00176 UInt_t cellIndex=cell->index();
00177 shmBuffer_->finishReadingRecoCell(cell);
00178 shmBuffer_->discardRecoCell(cellIndex);
00179 }
00180 else {
00181 try {
00182 if (cell->type()==0) {
00183 UInt_t cellIndex = cell->index();
00184 UInt_t cellOutModId = cell->outModId();
00185 UInt_t cellFUProcId = cell->fuProcessId();
00186 UInt_t cellFUGuid = cell->fuGuid();
00187 UChar_t* cellPayloadAddr = cell->payloadAddr();
00188 UInt_t cellEventSize = cell->eventSize();
00189 shmBuffer_->finishReadingRecoCell(cell);
00190
00191 lock();
00192 nbPendingSMDiscards_++;
00193 unlock();
00194
00195 sendInitMessage(cellIndex,cellOutModId,cellFUProcId,cellFUGuid,
00196 cellPayloadAddr,cellEventSize);
00197 }
00198 else if (cell->type()==1) {
00199 UInt_t cellIndex = cell->index();
00200 UInt_t cellRawIndex = cell->rawCellIndex();
00201 UInt_t cellRunNumber = cell->runNumber();
00202 UInt_t cellEvtNumber = cell->evtNumber();
00203 UInt_t cellOutModId = cell->outModId();
00204 UInt_t cellFUProcId = cell->fuProcessId();
00205 UInt_t cellFUGuid = cell->fuGuid();
00206 UChar_t *cellPayloadAddr = cell->payloadAddr();
00207 UInt_t cellEventSize = cell->eventSize();
00208 shmBuffer_->finishReadingRecoCell(cell);
00209
00210 lock();
00211 nbPendingSMDiscards_++;
00212 resources_[cellRawIndex]->incNbSent();
00213 if (resources_[cellRawIndex]->nbSent()==1) nbSent_++;
00214 unlock();
00215
00216 sendDataEvent(cellIndex,cellRunNumber,cellEvtNumber,cellOutModId,
00217 cellFUProcId,cellFUGuid,cellPayloadAddr,cellEventSize);
00218 }
00219 else if (cell->type()==2) {
00220 UInt_t cellIndex = cell->index();
00221 UInt_t cellRawIndex = cell->rawCellIndex();
00222
00223 UInt_t cellEvtNumber = cell->evtNumber();
00224 UInt_t cellFUProcId = cell->fuProcessId();
00225 UInt_t cellFUGuid = cell->fuGuid();
00226 UChar_t *cellPayloadAddr = cell->payloadAddr();
00227 UInt_t cellEventSize = cell->eventSize();
00228 shmBuffer_->finishReadingRecoCell(cell);
00229
00230 lock();
00231 nbPendingSMDiscards_++;
00232 resources_[cellRawIndex]->incNbSent();
00233 if (resources_[cellRawIndex]->nbSent()==1) { nbSent_++; nbSentError_++; }
00234 unlock();
00235
00236 sendErrorEvent(cellIndex,runNumber_,cellEvtNumber,
00237 cellFUProcId,cellFUGuid,cellPayloadAddr,cellEventSize);
00238 }
00239 else {
00240 string errmsg="Unknown RecoCell type (neither INIT/DATA/ERROR).";
00241 XCEPT_RAISE(evf::Exception,errmsg);
00242 }
00243 }
00244 catch (xcept::Exception& e) {
00245 LOG4CPLUS_FATAL(log_,"Failed to send EVENT DATA to StorageManager: "
00246 <<xcept::stdformat_exception_history(e));
00247 reschedule=false;
00248 }
00249 }
00250
00251 return reschedule;
00252 }
00253
00254
00255
00256 void FUResourceTable::startSendDqmWorkLoop() throw (evf::Exception)
00257 {
00258 try {
00259 wlSendDqm_=toolbox::task::getWorkLoopFactory()->getWorkLoop("SendDqm","waiting");
00260 if (!wlSendDqm_->isActive()) wlSendDqm_->activate();
00261 asSendDqm_=toolbox::task::bind(this,&FUResourceTable::sendDqm,"SendDqm");
00262 wlSendDqm_->submit(asSendDqm_);
00263 }
00264 catch (xcept::Exception& e) {
00265 string msg = "Failed to start workloop 'SendDqm'.";
00266 XCEPT_RETHROW(evf::Exception,msg,e);
00267 }
00268 }
00269
00270
00271
00272 bool FUResourceTable::sendDqm(toolbox::task::WorkLoop* )
00273 {
00274 bool reschedule=true;
00275
00276 FUShmDqmCell* cell=shmBuffer_->dqmCellToRead();
00277 dqm::State_t state=shmBuffer_->dqmState(cell->index());
00278
00279 if (state==dqm::EMPTY) {
00280 LOG4CPLUS_WARN(log_,"Don't reschedule sendDqm workloop.");
00281 std::cout << "shut down dqm workloop " << std::endl;
00282 UInt_t cellIndex=cell->index();
00283 shmBuffer_->finishReadingDqmCell(cell);
00284 shmBuffer_->discardDqmCell(cellIndex);
00285 reschedule=false;
00286 }
00287 else if (isHalting_) {
00288 UInt_t cellIndex=cell->index();
00289 shmBuffer_->finishReadingDqmCell(cell);
00290 shmBuffer_->discardDqmCell(cellIndex);
00291 }
00292 else {
00293 try {
00294 UInt_t cellIndex = cell->index();
00295 UInt_t cellRunNumber = cell->runNumber();
00296 UInt_t cellEvtAtUpdate = cell->evtAtUpdate();
00297 UInt_t cellFolderId = cell->folderId();
00298 UInt_t cellFUProcId = cell->fuProcessId();
00299 UInt_t cellFUGuid = cell->fuGuid();
00300 UChar_t *cellPayloadAddr = cell->payloadAddr();
00301 UInt_t cellEventSize = cell->eventSize();
00302 sendDqmEvent(cellIndex,cellRunNumber,cellEvtAtUpdate,cellFolderId,
00303 cellFUProcId,cellFUGuid,cellPayloadAddr,cellEventSize);
00304 shmBuffer_->finishReadingDqmCell(cell);
00305 }
00306 catch (xcept::Exception& e) {
00307 LOG4CPLUS_FATAL(log_,"Failed to send DQM DATA to StorageManager: "
00308 <<xcept::stdformat_exception_history(e));
00309 reschedule=false;
00310 }
00311 }
00312
00313 return reschedule;
00314 }
00315
00316
00317
00318 void FUResourceTable::startDiscardWorkLoop() throw (evf::Exception)
00319 {
00320 try {
00321 LOG4CPLUS_INFO(log_,"Start 'discard' workloop.");
00322 wlDiscard_=toolbox::task::getWorkLoopFactory()->getWorkLoop("Discard","waiting");
00323 if (!wlDiscard_->isActive()) wlDiscard_->activate();
00324 asDiscard_=toolbox::task::bind(this,&FUResourceTable::discard,"Discard");
00325 wlDiscard_->submit(asDiscard_);
00326 isActive_=true;
00327 }
00328 catch (xcept::Exception& e) {
00329 string msg = "Failed to start workloop 'Discard'.";
00330 XCEPT_RETHROW(evf::Exception,msg,e);
00331 }
00332 isReadyToShutDown_=false;
00333 }
00334
00335
00336
00337 bool FUResourceTable::discard(toolbox::task::WorkLoop* )
00338 {
00339 FUShmRawCell* cell =shmBuffer_->rawCellToDiscard();
00340 evt::State_t state=shmBuffer_->evtState(cell->index());
00341
00342 bool reschedule =true;
00343 bool shutDown =(state==evt::STOP);
00344 bool isLumi =(state==evt::LUMISECTION);
00345 UInt_t fuResourceId=cell->fuResourceId();
00346 UInt_t buResourceId=cell->buResourceId();
00347
00348
00349
00350
00351
00352 if (shutDown) {
00353 LOG4CPLUS_INFO(log_,"nbClientsToShutDown = "<<nbClientsToShutDown_);
00354 if (nbClientsToShutDown_>0) --nbClientsToShutDown_;
00355 if (nbClientsToShutDown_==0) {
00356 LOG4CPLUS_INFO(log_,"Don't reschedule discard-workloop.");
00357 isActive_ =false;
00358 reschedule=false;
00359 }
00360 }
00361
00362 shmBuffer_->discardRawCell(cell);
00363
00364 if (!shutDown && !isLumi) {
00365 resources_[fuResourceId]->release();
00366 lock();
00367 freeResourceIds_.push(fuResourceId);
00368 assert(freeResourceIds_.size()<=resources_.size());
00369 unlock();
00370
00371 if (!isHalting_) {
00372 sendDiscard(buResourceId);
00373 if(!isStopping_)sendAllocate();
00374 }
00375 }
00376
00377 if (!reschedule) {
00378 std::cout << " entered shutdown cycle " << std::endl;
00379 shmBuffer_->writeRecoEmptyEvent();
00380 UInt_t count=0;
00381 while (count<100) {
00382 std::cout << " shutdown cycle " <<shmBuffer_->nClients() << " "
00383 << FUShmBuffer::shm_nattch(shmBuffer_->shmid()) << std::endl;
00384 if (shmBuffer_->nClients()==0&&
00385 FUShmBuffer::shm_nattch(shmBuffer_->shmid())==1) {
00386
00387 break;
00388 }
00389 else {
00390 count++;
00391 std::cout << " shutdown cycle attempt " << count << std::endl;
00392 LOG4CPLUS_DEBUG(log_,"FUResourceTable: Wait for all clients to detach,"
00393 <<" nClients="<<shmBuffer_->nClients()
00394 <<" nattch="<<FUShmBuffer::shm_nattch(shmBuffer_->shmid())
00395 <<" ("<<count<<")");
00396 ::usleep(shutdownTimeout_);
00397 if(count*shutdownTimeout_ > 10000000)
00398 LOG4CPLUS_WARN(log_,"FUResourceTable:LONG Wait (>10s) for all clients to detach,"
00399 <<" nClients="<<shmBuffer_->nClients()
00400 <<" nattch="<<FUShmBuffer::shm_nattch(shmBuffer_->shmid())
00401 <<" ("<<count<<")");
00402
00403 }
00404 }
00405 bool allEmpty = false;
00406 std::cout << "Checking if all dqm cells are empty " << std::endl;
00407 while(!allEmpty){
00408 UInt_t n=nbDqmCells_;
00409 allEmpty = true;
00410 shmBuffer_->lock();
00411 for (UInt_t i=0;i<n;i++) {
00412 dqm::State_t state=shmBuffer_->dqmState(i);
00413 if(state!=dqm::EMPTY) allEmpty = false;
00414 }
00415 shmBuffer_->unlock();
00416 }
00417 std::cout << "Making sure there are no dqm pending discards " << std::endl;
00418 if(nbPendingSMDqmDiscards_ != 0)
00419 {
00420 LOG4CPLUS_WARN(log_,"FUResourceTable: pending DQM discards not zero: ="
00421 << nbPendingSMDqmDiscards_ << " while cells are all empty. This may cause problems at next start ");
00422
00423 }
00424 shmBuffer_->writeDqmEmptyEvent();
00425 isReadyToShutDown_ = true;
00426
00427 }
00428
00429 return reschedule;
00430 }
00431
00432
00433
00434
00435 UInt_t FUResourceTable::allocateResource()
00436 {
00437 assert(!freeResourceIds_.empty());
00438
00439 lock();
00440 UInt_t fuResourceId=freeResourceIds_.front();
00441 freeResourceIds_.pop();
00442 nbPending_++;
00443 nbAllocated_++;
00444 unlock();
00445
00446 return fuResourceId;
00447 }
00448
00449
00450
00451 bool FUResourceTable::buildResource(MemRef_t* bufRef)
00452 {
00453 bool eventComplete=false;
00454
00455 I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME *block=
00456 (I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME*)bufRef->getDataLocation();
00457
00458 UInt_t fuResourceId=(UInt_t)block->fuTransactionId;
00459 UInt_t buResourceId=(UInt_t)block->buResourceId;
00460 FUResource* resource =resources_[fuResourceId];
00461
00462
00463 if (!resource->fatalError()&&!resource->isAllocated()) {
00464 FUShmRawCell* cell=shmBuffer_->rawCellToWrite();
00465 resource->allocate(cell);
00466 timeval now;
00467 gettimeofday(&now,0);
00468
00469 frb_->setRBTimeStamp(((uint64_t)(now.tv_sec) << 32) + (uint64_t)(now.tv_usec));
00470
00471 frb_->setRBEventCount(nbCompleted_);
00472
00473 if (doCrcCheck_>0&&0==nbAllocated_%doCrcCheck_) resource->doCrcCheck(true);
00474 else resource->doCrcCheck(false);
00475 }
00476
00477
00478
00479 if (!resource->fatalError()) {
00480 resource->process(bufRef);
00481 lock();
00482 nbErrors_ +=resource->nbErrors();
00483 nbCrcErrors_+=resource->nbCrcErrors();
00484 unlock();
00485
00486
00487 if (resource->isComplete()) {
00488 lock();
00489 nbCompleted_++;
00490 nbPending_--;
00491 unlock();
00492 if (doDumpEvents_>0&&nbCompleted_%doDumpEvents_==0)
00493 dumpEvent(resource->shmCell());
00494 shmBuffer_->finishWritingRawCell(resource->shmCell());
00495 eventComplete=true;
00496 }
00497
00498 }
00499
00500
00501 if (resource->fatalError()) {
00502 bool lastMsg=isLastMessageOfEvent(bufRef);
00503 if (lastMsg) {
00504 shmBuffer_->releaseRawCell(resource->shmCell());
00505 resource->release();
00506 lock();
00507 freeResourceIds_.push(fuResourceId);
00508 nbDiscarded_++;
00509 nbLost_++;
00510 nbPending_--;
00511 unlock();
00512 bu_->sendDiscard(buResourceId);
00513 sendAllocate();
00514 }
00515 bufRef->release();
00516 }
00517
00518 return eventComplete;
00519 }
00520
00521
00522
00523 bool FUResourceTable::discardDataEvent(MemRef_t* bufRef)
00524 {
00525 I2O_FU_DATA_DISCARD_MESSAGE_FRAME *msg;
00526 msg=(I2O_FU_DATA_DISCARD_MESSAGE_FRAME*)bufRef->getDataLocation();
00527 UInt_t recoIndex=msg->rbBufferID;
00528
00529 if (acceptSMDataDiscard_[recoIndex]) {
00530 lock();
00531 nbPendingSMDiscards_--;
00532 unlock();
00533 acceptSMDataDiscard_[recoIndex] = false;
00534
00535 if (!isHalting_) {
00536 shmBuffer_->discardRecoCell(recoIndex);
00537 bufRef->release();
00538 }
00539 }
00540 else {
00541 LOG4CPLUS_ERROR(log_,"Spurious DATA discard by StorageManager, skip!");
00542 }
00543
00544 if (isHalting_) {
00545 bufRef->release();
00546 return false;
00547 }
00548
00549 return true;
00550 }
00551
00552
00553
00554 bool FUResourceTable::discardDqmEvent(MemRef_t* bufRef)
00555 {
00556 I2O_FU_DQM_DISCARD_MESSAGE_FRAME *msg;
00557 msg=(I2O_FU_DQM_DISCARD_MESSAGE_FRAME*)bufRef->getDataLocation();
00558 UInt_t dqmIndex=msg->rbBufferID;
00559 unsigned int ntries = 0;
00560 while(shmBuffer_->dqmState(dqmIndex)!=dqm::SENT){
00561 LOG4CPLUS_WARN(log_,"DQM discard for cell "<< dqmIndex << " which is not yer in SENT state - waiting");
00562 ::usleep(10000);
00563 if(ntries++>10){
00564 LOG4CPLUS_ERROR(log_,"DQM cell " << dqmIndex
00565 << " discard timed out while cell still in state " << shmBuffer_->dqmState(dqmIndex) );
00566 bufRef->release();
00567 return true;
00568 }
00569 }
00570 if (acceptSMDqmDiscard_[dqmIndex]>0) {
00571 acceptSMDqmDiscard_[dqmIndex]--;
00572 if(nbPendingSMDqmDiscards_>0){
00573 nbPendingSMDqmDiscards_--;
00574 }
00575 else {
00576 LOG4CPLUS_WARN(log_,"Spurious??? DQM discard by StorageManager, index " << dqmIndex
00577 << " cell state " << shmBuffer_->dqmState(dqmIndex) << " accept flag " << acceptSMDqmDiscard_[dqmIndex];);
00578 }
00579
00580 if (!isHalting_) {
00581 shmBuffer_->discardDqmCell(dqmIndex);
00582 bufRef->release();
00583 }
00584
00585 }
00586 else {
00587 LOG4CPLUS_ERROR(log_,"Spurious DQM discard for cell " << dqmIndex
00588 << " from StorageManager while cell is not accepting discards");
00589 }
00590
00591 if (isHalting_) {
00592 bufRef->release();
00593 return false;
00594 }
00595
00596 return true;
00597 }
00598
00599
00600
00601 void FUResourceTable::postEndOfLumiSection(MemRef_t* bufRef)
00602 {
00603 I2O_EVM_END_OF_LUMISECTION_MESSAGE_FRAME *msg =
00604 (I2O_EVM_END_OF_LUMISECTION_MESSAGE_FRAME *)bufRef->getDataLocation();
00605
00606
00607
00608 for(unsigned int i = 0; i < nbRawCells_; i++)
00609 shmBuffer_->writeRawLumiSectionEvent(msg->lumiSection);
00610 }
00611
00612
00613
00614 void FUResourceTable::dropEvent()
00615 {
00616 FUShmRawCell* cell=shmBuffer_->rawCellToRead();
00617 UInt_t fuResourceId=cell->fuResourceId();
00618 shmBuffer_->finishReadingRawCell(cell);
00619 shmBuffer_->scheduleRawCellForDiscard(fuResourceId);
00620 }
00621
00622
00623
00624 bool FUResourceTable::handleCrashedEP(UInt_t runNumber,pid_t pid)
00625 {
00626 bool retval = false;
00627 vector<pid_t> pids=cellPrcIds();
00628 UInt_t iRawCell=pids.size();
00629 for (UInt_t i=0;i<pids.size();i++) { if (pid==pids[i]) { iRawCell=i; break; } }
00630
00631 if (iRawCell<pids.size()){
00632 shmBuffer_->writeErrorEventData(runNumber,pid,iRawCell);
00633 retval = true;
00634 }
00635 else
00636 LOG4CPLUS_WARN(log_,"No raw data to send to error stream for process " << pid);
00637 shmBuffer_->removeClientPrcId(pid);
00638 return retval;
00639 }
00640
00641
00642
00643 void FUResourceTable::dumpEvent(FUShmRawCell* cell)
00644 {
00645 ostringstream oss; oss<<"/tmp/evt"<<cell->evtNumber()<<".dump";
00646 ofstream fout(oss.str().c_str());
00647 fout.fill('0');
00648
00649 fout<<"#\n# evt "<<cell->evtNumber()<<"\n#\n"<<endl;
00650 for (unsigned int i=0;i<cell->nFed();i++) {
00651 if (cell->fedSize(i)==0) continue;
00652 fout<<"# fedid "<<i<<endl;
00653 unsigned char* addr=cell->fedAddr(i);
00654 for (unsigned int j=0;j<cell->fedSize(i);j++) {
00655 fout<<setiosflags(ios::right)<<setw(2)<<hex<<(int)(*addr)<<dec;
00656 if ((j+1)%8) fout<<" "; else fout<<endl;
00657 ++addr;
00658 }
00659 fout<<endl;
00660 }
00661 fout.close();
00662 }
00663
00664
00665
00666 void FUResourceTable::stop()
00667 {
00668 isStopping_ = true;
00669 shutDownClients();
00670 }
00671
00672
00673
00674 void FUResourceTable::halt()
00675 {
00676 isHalting_=true;
00677 shutDownClients();
00678 }
00679
00680
00681
00682 void FUResourceTable::shutDownClients()
00683 {
00684 nbClientsToShutDown_ = nbClients();
00685 isReadyToShutDown_ = false;
00686
00687 if (nbClientsToShutDown_==0) {
00688 LOG4CPLUS_INFO(log_,"No clients to shut down. Checking if there are raw cells not assigned to any process yet");
00689 UInt_t n=nbResources();
00690 for (UInt_t i=0;i<n;i++) {
00691 evt::State_t state=shmBuffer_->evtState(i);
00692 if (state!=evt::EMPTY){
00693 LOG4CPLUS_WARN(log_,"Schedule discard at STOP for orphaned event in state "
00694 << state);
00695 shmBuffer_->scheduleRawCellForDiscardServerSide(i);
00696 }
00697 }
00698 shmBuffer_->scheduleRawEmptyCellForDiscard();
00699 }
00700 else {
00701 UInt_t n=nbClientsToShutDown_;
00702 for (UInt_t i=0;i<n;++i) shmBuffer_->writeRawEmptyEvent();
00703 }
00704 }
00705
00706
00707
00708 void FUResourceTable::clear()
00709 {
00710 for (UInt_t i=0;i<resources_.size();i++) {
00711 resources_[i]->release();
00712 delete resources_[i];
00713 }
00714 resources_.clear();
00715 while (!freeResourceIds_.empty()) freeResourceIds_.pop();
00716 }
00717
00718
00719
00720 void FUResourceTable::resetCounters()
00721 {
00722 if (0!=shmBuffer_) {
00723 for (UInt_t i=0;i<shmBuffer_->nRecoCells();i++) acceptSMDataDiscard_[i]= false;
00724 for (UInt_t i=0;i<shmBuffer_->nDqmCells();i++) acceptSMDqmDiscard_[i] = 0;
00725 }
00726
00727 nbAllocated_ =nbPending_;
00728 nbCompleted_ =0;
00729 nbSent_ =0;
00730 nbSentError_ =0;
00731 nbSentDqm_ =0;
00732 nbPendingSMDiscards_ =0;
00733 nbPendingSMDqmDiscards_=0;
00734 nbDiscarded_ =0;
00735 nbLost_ =0;
00736
00737 nbErrors_ =0;
00738 nbCrcErrors_ =0;
00739 nbAllocSent_ =0;
00740
00741 sumOfSquares_ =0;
00742 sumOfSizes_ =0;
00743 isStopping_ =false;
00744 }
00745
00746
00747
00748 UInt_t FUResourceTable::nbClients() const
00749 {
00750 UInt_t result(0);
00751 if (0!=shmBuffer_) result=shmBuffer_->nClients();
00752 return result;
00753 }
00754
00755
00756
00757 vector<pid_t> FUResourceTable::clientPrcIds() const
00758 {
00759 vector<pid_t> result;
00760 if (0!=shmBuffer_) {
00761 UInt_t n=nbClients();
00762 for (UInt_t i=0;i<n;i++) result.push_back(shmBuffer_->clientPrcId(i));
00763 }
00764 return result;
00765 }
00766
00767
00768
00769 string FUResourceTable::clientPrcIdsAsString() const
00770 {
00771 stringstream ss;
00772 if (0!=shmBuffer_) {
00773 UInt_t n=nbClients();
00774 for (UInt_t i=0;i<n;i++) {
00775 if (i>0) ss<<",";
00776 ss<<shmBuffer_->clientPrcId(i);
00777 }
00778 }
00779 return ss.str();
00780 }
00781
00782
00783
00784 vector<string> FUResourceTable::cellStates() const
00785 {
00786 vector<string> result;
00787 if (0!=shmBuffer_) {
00788 UInt_t n=nbResources();
00789 shmBuffer_->lock();
00790 for (UInt_t i=0;i<n;i++) {
00791 evt::State_t state=shmBuffer_->evtState(i);
00792 if (state==evt::EMPTY) result.push_back("EMPTY");
00793 else if (state==evt::STOP) result.push_back("STOP");
00794 else if (state==evt::RAWWRITING) result.push_back("RAWWRITING");
00795 else if (state==evt::RAWWRITTEN) result.push_back("RAWWRITTEN");
00796 else if (state==evt::RAWREADING) result.push_back("RAWREADING");
00797 else if (state==evt::RAWREAD) result.push_back("RAWREAD");
00798 else if (state==evt::PROCESSING) result.push_back("PROCESSING");
00799 else if (state==evt::PROCESSED) result.push_back("PROCESSED");
00800 else if (state==evt::RECOWRITING)result.push_back("RECOWRITING");
00801 else if (state==evt::RECOWRITTEN)result.push_back("RECOWRITTEN");
00802 else if (state==evt::SENDING) result.push_back("SENDING");
00803 else if (state==evt::SENT) result.push_back("SENT");
00804 else if (state==evt::DISCARDING) result.push_back("DISCARDING");
00805 }
00806 shmBuffer_->unlock();
00807 }
00808 return result;
00809 }
00810
00811 vector<string> FUResourceTable::dqmCellStates() const
00812 {
00813 vector<string> result;
00814 if (0!=shmBuffer_) {
00815 UInt_t n=nbDqmCells_;
00816 shmBuffer_->lock();
00817 for (UInt_t i=0;i<n;i++) {
00818 dqm::State_t state=shmBuffer_->dqmState(i);
00819 if (state==dqm::EMPTY) result.push_back("EMPTY");
00820 else if (state==dqm::WRITING) result.push_back("WRITING");
00821 else if (state==dqm::WRITTEN) result.push_back("WRITTEN");
00822 else if (state==dqm::SENDING) result.push_back("SENDING");
00823 else if (state==dqm::SENT) result.push_back("SENT");
00824 else if (state==dqm::DISCARDING) result.push_back("DISCARDING");
00825 }
00826 shmBuffer_->unlock();
00827 }
00828 return result;
00829 }
00830
00831
00832
00833 vector<UInt_t> FUResourceTable::cellEvtNumbers() const
00834 {
00835 vector<UInt_t> result;
00836 if (0!=shmBuffer_) {
00837 UInt_t n=nbResources();
00838 shmBuffer_->lock();
00839 for (UInt_t i=0;i<n;i++) result.push_back(shmBuffer_->evtNumber(i));
00840 shmBuffer_->unlock();
00841 }
00842 return result;
00843 }
00844
00845
00846
00847 vector<pid_t> FUResourceTable::cellPrcIds() const
00848 {
00849 vector<pid_t> result;
00850 if (0!=shmBuffer_) {
00851 UInt_t n=nbResources();
00852 shmBuffer_->lock();
00853 for (UInt_t i=0;i<n;i++) result.push_back(shmBuffer_->evtPrcId(i));
00854 shmBuffer_->unlock();
00855 }
00856 return result;
00857 }
00858
00859
00860
00861 vector<time_t> FUResourceTable::cellTimeStamps() const
00862 {
00863 vector<time_t> result;
00864 if (0!=shmBuffer_) {
00865 UInt_t n=nbResources();
00866 shmBuffer_->lock();
00867 for (UInt_t i=0;i<n;i++) result.push_back(shmBuffer_->evtTimeStamp(i));
00868 shmBuffer_->unlock();
00869 }
00870 return result;
00871 }
00872
00873
00875
00877
00878
00879 void FUResourceTable::sendAllocate()
00880 {
00881 UInt_t nbFreeSlots = this->nbFreeSlots();
00882 UInt_t nbFreeSlotsMax = resources_.size()/2;
00883 if (nbFreeSlots>nbFreeSlotsMax) {
00884 UIntVec_t fuResourceIds;
00885 for (UInt_t i=0;i<nbFreeSlots;i++)
00886 fuResourceIds.push_back(allocateResource());
00887 bu_->sendAllocate(fuResourceIds);
00888 nbAllocSent_++;
00889 }
00890 }
00891
00892
00893
00894 void FUResourceTable::sendDiscard(UInt_t buResourceId)
00895 {
00896 bu_->sendDiscard(buResourceId);
00897 nbDiscarded_++;
00898 }
00899
00900
00901
00902 void FUResourceTable::sendInitMessage(UInt_t fuResourceId,
00903 UInt_t outModId,
00904 UInt_t fuProcessId,
00905 UInt_t fuGuid,
00906 UChar_t *data,
00907 UInt_t dataSize)
00908 {
00909 if (0==sm_) {
00910 LOG4CPLUS_ERROR(log_,"No StorageManager, DROP INIT MESSAGE!");
00911 }
00912 else {
00913 acceptSMDataDiscard_[fuResourceId] = true;
00914 UInt_t nbBytes=sm_->sendInitMessage(fuResourceId,outModId,fuProcessId,
00915 fuGuid,data,dataSize);
00916 sumOfSquares_+=(uint64_t)nbBytes*(uint64_t)nbBytes;
00917 sumOfSizes_ +=nbBytes;
00918 }
00919 }
00920
00921
00922
00923 void FUResourceTable::sendDataEvent(UInt_t fuResourceId,
00924 UInt_t runNumber,
00925 UInt_t evtNumber,
00926 UInt_t outModId,
00927 UInt_t fuProcessId,
00928 UInt_t fuGuid,
00929 UChar_t *data,
00930 UInt_t dataSize)
00931 {
00932 if (0==sm_) {
00933 LOG4CPLUS_ERROR(log_,"No StorageManager, DROP DATA EVENT!");
00934 }
00935 else {
00936 acceptSMDataDiscard_[fuResourceId] = true;
00937 UInt_t nbBytes=sm_->sendDataEvent(fuResourceId,runNumber,evtNumber,
00938 outModId,fuProcessId,fuGuid,
00939 data,dataSize);
00940 sumOfSquares_+=(uint64_t)nbBytes*(uint64_t)nbBytes;
00941 sumOfSizes_ +=nbBytes;
00942 }
00943 }
00944
00945
00946
00947 void FUResourceTable::sendErrorEvent(UInt_t fuResourceId,
00948 UInt_t runNumber,
00949 UInt_t evtNumber,
00950 UInt_t fuProcessId,
00951 UInt_t fuGuid,
00952 UChar_t *data,
00953 UInt_t dataSize)
00954 {
00955 if (0==sm_) {
00956 LOG4CPLUS_ERROR(log_,"No StorageManager, DROP ERROR EVENT!");
00957 }
00958 else {
00959 acceptSMDataDiscard_[fuResourceId] = true;
00960 UInt_t nbBytes=sm_->sendErrorEvent(fuResourceId,runNumber,evtNumber,
00961 fuProcessId,fuGuid,data,dataSize);
00962 sumOfSquares_+=(uint64_t)nbBytes*(uint64_t)nbBytes;
00963 sumOfSizes_ +=nbBytes;
00964 }
00965
00966
00967
00968
00969
00970
00971
00972
00973
00974
00975
00976
00977
00978
00979
00980
00981
00982
00983
00984
00985
00986
00987
00988 }
00989
00990
00991
00992 void FUResourceTable::sendDqmEvent(UInt_t fuDqmId,
00993 UInt_t runNumber,
00994 UInt_t evtAtUpdate,
00995 UInt_t folderId,
00996 UInt_t fuProcessId,
00997 UInt_t fuGuid,
00998 UChar_t* data,
00999 UInt_t dataSize)
01000 {
01001 if (0==sm_) {
01002 LOG4CPLUS_WARN(log_,"No StorageManager, DROP DQM EVENT.");
01003 }
01004 else {
01005 sm_->sendDqmEvent(fuDqmId,runNumber,evtAtUpdate,folderId,
01006 fuProcessId,fuGuid,data,dataSize);
01007
01008 nbPendingSMDqmDiscards_++;
01009
01010 acceptSMDqmDiscard_[fuDqmId]++;
01011 if(acceptSMDqmDiscard_[fuDqmId]>1)
01012 LOG4CPLUS_WARN(log_,"DQM Cell " << fuDqmId << " being sent more than once for folder "
01013 << folderId << " process " << fuProcessId << " guid " << fuGuid);
01014 nbSentDqm_++;
01015 }
01016 }
01017
01018
01019
01020 bool FUResourceTable::isLastMessageOfEvent(MemRef_t* bufRef)
01021 {
01022 while (0!=bufRef->getNextReference()) bufRef=bufRef->getNextReference();
01023
01024 I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME *block=
01025 (I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME*)bufRef->getDataLocation();
01026
01027 UInt_t iBlock =block->blockNb;
01028 UInt_t nBlock =block->nbBlocksInSuperFragment;
01029 UInt_t iSuperFrag=block->superFragmentNb;
01030 UInt_t nSuperFrag=block->nbSuperFragmentsInEvent;
01031
01032 return ((iSuperFrag==nSuperFrag-1)&&(iBlock==nBlock-1));
01033 }
01034
01035
01036 void FUResourceTable::injectCRCError()
01037 {
01038 for (UInt_t i=0;i<resources_.size();i++) {
01039 resources_[i]->scheduleCRCError();
01040 }
01041 }
01042 void FUResourceTable::printWorkLoopStatus()
01043 {
01044 std::cout << "Workloop status===============" << std::endl;
01045 std::cout << "==============================" << std::endl;
01046 if(wlSendData_!=0)
01047 std::cout << "SendData -> " << wlSendData_->isActive() << std::endl;
01048 if(wlSendDqm_!=0)
01049 std::cout << "SendDqm -> " << wlSendDqm_->isActive() << std::endl;
01050 if(wlDiscard_!=0)
01051 std::cout << "Discard -> " << wlDiscard_->isActive() << std::endl;
01052 std::cout << "Workloops Active -> " << isActive_ << std::endl;
01053
01054 }
01055
01056
01057 void FUResourceTable::lastResort()
01058 {
01059 std::cout << "lastResort: " << shmBuffer_->nbRawCellsToRead()
01060 << " more rawcells to read " << std::endl;
01061 while(shmBuffer_->nbRawCellsToRead()!=0){
01062 FUShmRawCell* newCell=shmBuffer_->rawCellToRead();
01063 std::cout << "lastResort: " << shmBuffer_->nbRawCellsToRead() << std::endl;
01064 shmBuffer_->scheduleRawEmptyCellForDiscardServerSide(newCell);
01065 std::cout << "lastResort: schedule raw cell for discard" << std::endl;
01066 }
01067 }