00001
00002
00003
00004
00005
00006
00008
00009
00010 #include "EventFilter/ResourceBroker/interface/FUResourceTable.h"
00011 #include "FWCore/Utilities/interface/CRC16.h"
00012 #include "EvffedFillerRB.h"
00013
00014 #include "toolbox/task/WorkLoopFactory.h"
00015 #include "interface/evb/i2oEVBMsgs.h"
00016 #include "xcept/tools.h"
00017
00018
00019 #include <fstream>
00020 #include <sstream>
00021 #include <iomanip>
00022 #include <unistd.h>
00023
00024
00025 using namespace evf;
00026 using namespace std;
00027
00028
00030
00032
00033
00034 FUResourceTable::FUResourceTable(bool segmentationMode,
00035 UInt_t nbRawCells,
00036 UInt_t nbRecoCells,
00037 UInt_t nbDqmCells,
00038 UInt_t rawCellSize,
00039 UInt_t recoCellSize,
00040 UInt_t dqmCellSize,
00041 BUProxy *bu,
00042 SMProxy *sm,
00043 log4cplus::Logger logger,
00044 unsigned int timeout,
00045 EvffedFillerRB *frb,
00046 xdaq::Application*app)
00047 throw (evf::Exception)
00048 : bu_(bu)
00049 , sm_(sm)
00050 , log_(logger)
00051 , wlSendData_(0)
00052 , asSendData_(0)
00053 , wlSendDqm_(0)
00054 , asSendDqm_(0)
00055 , wlDiscard_(0)
00056 , asDiscard_(0)
00057 , shmBuffer_(0)
00058 , nbDqmCells_(nbDqmCells)
00059 , nbRawCells_(nbRawCells)
00060 , nbRecoCells_(nbRecoCells)
00061 , acceptSMDataDiscard_(0)
00062 , acceptSMDqmDiscard_(0)
00063 , doCrcCheck_(1)
00064 , shutdownTimeout_(timeout)
00065 , nbPending_(0)
00066 , nbClientsToShutDown_(0)
00067 , isReadyToShutDown_(true)
00068 , isActive_(false)
00069 , isHalting_(false)
00070 , isStopping_(false)
00071 , runNumber_(0xffffffff)
00072 , lock_(toolbox::BSem::FULL)
00073 , frb_(frb)
00074 , app_(app)
00075 {
00076 initialize(segmentationMode,
00077 nbRawCells,nbRecoCells,nbDqmCells,
00078 rawCellSize,recoCellSize,dqmCellSize);
00079 }
00080
00081
00082
00083 FUResourceTable::~FUResourceTable()
00084 {
00085 clear();
00086 if(wlSendData_){
00087 wlSendData_->cancel();
00088 toolbox::task::getWorkLoopFactory()->removeWorkLoop("SendData","waiting");
00089 }
00090 if(wlSendDqm_){
00091 wlSendDqm_->cancel();
00092 toolbox::task::getWorkLoopFactory()->removeWorkLoop("SendDqm","waiting");
00093 }
00094 if(wlDiscard_){
00095 wlDiscard_->cancel();
00096 toolbox::task::getWorkLoopFactory()->removeWorkLoop("Discard","waiting");
00097 }
00098 shmdt(shmBuffer_);
00099 if (FUShmBuffer::releaseSharedMemory())
00100 LOG4CPLUS_INFO(log_,"SHARED MEMORY SUCCESSFULLY RELEASED.");
00101 if (0!=acceptSMDataDiscard_) delete [] acceptSMDataDiscard_;
00102 if (0!= acceptSMDqmDiscard_) delete [] acceptSMDqmDiscard_;
00103 }
00104
00105
00107
00109
00110
00111 void FUResourceTable::initialize(bool segmentationMode,
00112 UInt_t nbRawCells,
00113 UInt_t nbRecoCells,
00114 UInt_t nbDqmCells,
00115 UInt_t rawCellSize,
00116 UInt_t recoCellSize,
00117 UInt_t dqmCellSize)
00118 throw (evf::Exception)
00119 {
00120 clear();
00121
00122 shmBuffer_=FUShmBuffer::createShmBuffer(segmentationMode,
00123 nbRawCells,nbRecoCells,nbDqmCells,
00124 rawCellSize,recoCellSize,dqmCellSize);
00125 if (0==shmBuffer_) {
00126 string msg = "CREATION OF SHARED MEMORY SEGMENT FAILED!";
00127 LOG4CPLUS_FATAL(log_,msg);
00128 XCEPT_RAISE(evf::Exception,msg);
00129 }
00130
00131 for (UInt_t i=0;i<nbRawCells_;i++) {
00132 resources_.push_back(new FUResource(i,log_,frb_,app_));
00133 freeResourceIds_.push(i);
00134 }
00135
00136 acceptSMDataDiscard_ = new bool[nbRecoCells];
00137 acceptSMDqmDiscard_ = new int[nbDqmCells];
00138
00139 resetCounters();
00140 }
00141
00142
00143
00144 void FUResourceTable::startSendDataWorkLoop() throw (evf::Exception)
00145 {
00146 try {
00147 wlSendData_=
00148 toolbox::task::getWorkLoopFactory()->getWorkLoop("SendData","waiting");
00149 if (!wlSendData_->isActive()) wlSendData_->activate();
00150 asSendData_=toolbox::task::bind(this,&FUResourceTable::sendData,"SendData");
00151 wlSendData_->submit(asSendData_);
00152 }
00153 catch (xcept::Exception& e) {
00154 string msg = "Failed to start workloop 'SendData'.";
00155 XCEPT_RETHROW(evf::Exception,msg,e);
00156 }
00157 }
00158
00159
00160
00161 bool FUResourceTable::sendData(toolbox::task::WorkLoop* )
00162 {
00163 bool reschedule=true;
00164
00165 FUShmRecoCell* cell=shmBuffer_->recoCellToRead();
00166
00167 if (0==cell->eventSize()) {
00168 LOG4CPLUS_INFO(log_,"Don't reschedule sendData workloop.");
00169 UInt_t cellIndex=cell->index();
00170 shmBuffer_->finishReadingRecoCell(cell);
00171 shmBuffer_->discardRecoCell(cellIndex);
00172 reschedule=false;
00173 }
00174 else if (isHalting_) {
00175 LOG4CPLUS_INFO(log_,"sendData: isHalting, discard recoCell.");
00176 UInt_t cellIndex=cell->index();
00177 shmBuffer_->finishReadingRecoCell(cell);
00178 shmBuffer_->discardRecoCell(cellIndex);
00179 }
00180 else {
00181 try {
00182 if (cell->type()==0) {
00183 UInt_t cellIndex = cell->index();
00184 UInt_t cellOutModId = cell->outModId();
00185 UInt_t cellFUProcId = cell->fuProcessId();
00186 UInt_t cellFUGuid = cell->fuGuid();
00187 UChar_t* cellPayloadAddr = cell->payloadAddr();
00188 UInt_t cellEventSize = cell->eventSize();
00189 shmBuffer_->finishReadingRecoCell(cell);
00190
00191 lock();
00192 nbPendingSMDiscards_++;
00193 unlock();
00194
00195 sendInitMessage(cellIndex,cellOutModId,cellFUProcId,cellFUGuid,
00196 cellPayloadAddr,cellEventSize);
00197 }
00198 else if (cell->type()==1) {
00199 UInt_t cellIndex = cell->index();
00200 UInt_t cellRawIndex = cell->rawCellIndex();
00201 UInt_t cellRunNumber = cell->runNumber();
00202 UInt_t cellEvtNumber = cell->evtNumber();
00203 UInt_t cellOutModId = cell->outModId();
00204 UInt_t cellFUProcId = cell->fuProcessId();
00205 UInt_t cellFUGuid = cell->fuGuid();
00206 UChar_t *cellPayloadAddr = cell->payloadAddr();
00207 UInt_t cellEventSize = cell->eventSize();
00208 shmBuffer_->finishReadingRecoCell(cell);
00209
00210 lock();
00211 nbPendingSMDiscards_++;
00212 resources_[cellRawIndex]->incNbSent();
00213 if (resources_[cellRawIndex]->nbSent()==1) nbSent_++;
00214 unlock();
00215
00216 sendDataEvent(cellIndex,cellRunNumber,cellEvtNumber,cellOutModId,
00217 cellFUProcId,cellFUGuid,cellPayloadAddr,cellEventSize);
00218 }
00219 else if (cell->type()==2) {
00220 UInt_t cellIndex = cell->index();
00221 UInt_t cellRawIndex = cell->rawCellIndex();
00222
00223 UInt_t cellEvtNumber = cell->evtNumber();
00224 UInt_t cellFUProcId = cell->fuProcessId();
00225 UInt_t cellFUGuid = cell->fuGuid();
00226 UChar_t *cellPayloadAddr = cell->payloadAddr();
00227 UInt_t cellEventSize = cell->eventSize();
00228 shmBuffer_->finishReadingRecoCell(cell);
00229
00230 lock();
00231 nbPendingSMDiscards_++;
00232 resources_[cellRawIndex]->incNbSent();
00233 if (resources_[cellRawIndex]->nbSent()==1) { nbSent_++; nbSentError_++; }
00234 unlock();
00235
00236 sendErrorEvent(cellIndex,runNumber_,cellEvtNumber,
00237 cellFUProcId,cellFUGuid,cellPayloadAddr,cellEventSize);
00238 }
00239 else {
00240 string errmsg="Unknown RecoCell type (neither INIT/DATA/ERROR).";
00241 XCEPT_RAISE(evf::Exception,errmsg);
00242 }
00243 }
00244 catch (xcept::Exception& e) {
00245 LOG4CPLUS_FATAL(log_,"Failed to send EVENT DATA to StorageManager: "
00246 <<xcept::stdformat_exception_history(e));
00247 reschedule=false;
00248 }
00249 }
00250
00251 return reschedule;
00252 }
00253
00254
00255
00256 void FUResourceTable::startSendDqmWorkLoop() throw (evf::Exception)
00257 {
00258 try {
00259 wlSendDqm_=toolbox::task::getWorkLoopFactory()->getWorkLoop("SendDqm","waiting");
00260 if (!wlSendDqm_->isActive()) wlSendDqm_->activate();
00261 asSendDqm_=toolbox::task::bind(this,&FUResourceTable::sendDqm,"SendDqm");
00262 wlSendDqm_->submit(asSendDqm_);
00263 }
00264 catch (xcept::Exception& e) {
00265 string msg = "Failed to start workloop 'SendDqm'.";
00266 XCEPT_RETHROW(evf::Exception,msg,e);
00267 }
00268 }
00269
00270
00271
00272 bool FUResourceTable::sendDqm(toolbox::task::WorkLoop* )
00273 {
00274 bool reschedule=true;
00275
00276 FUShmDqmCell* cell=shmBuffer_->dqmCellToRead();
00277 dqm::State_t state=shmBuffer_->dqmState(cell->index());
00278
00279 if (state==dqm::EMPTY) {
00280 LOG4CPLUS_WARN(log_,"Don't reschedule sendDqm workloop.");
00281 std::cout << "shut down dqm workloop " << std::endl;
00282 UInt_t cellIndex=cell->index();
00283 shmBuffer_->finishReadingDqmCell(cell);
00284 shmBuffer_->discardDqmCell(cellIndex);
00285 reschedule=false;
00286 }
00287 else if (isHalting_) {
00288 UInt_t cellIndex=cell->index();
00289 shmBuffer_->finishReadingDqmCell(cell);
00290 shmBuffer_->discardDqmCell(cellIndex);
00291 }
00292 else {
00293 try {
00294 UInt_t cellIndex = cell->index();
00295 UInt_t cellRunNumber = cell->runNumber();
00296 UInt_t cellEvtAtUpdate = cell->evtAtUpdate();
00297 UInt_t cellFolderId = cell->folderId();
00298 UInt_t cellFUProcId = cell->fuProcessId();
00299 UInt_t cellFUGuid = cell->fuGuid();
00300 UChar_t *cellPayloadAddr = cell->payloadAddr();
00301 UInt_t cellEventSize = cell->eventSize();
00302 sendDqmEvent(cellIndex,cellRunNumber,cellEvtAtUpdate,cellFolderId,
00303 cellFUProcId,cellFUGuid,cellPayloadAddr,cellEventSize);
00304 shmBuffer_->finishReadingDqmCell(cell);
00305 }
00306 catch (xcept::Exception& e) {
00307 LOG4CPLUS_FATAL(log_,"Failed to send DQM DATA to StorageManager: "
00308 <<xcept::stdformat_exception_history(e));
00309 reschedule=false;
00310 }
00311 }
00312
00313 return reschedule;
00314 }
00315
00316
00317
00318 void FUResourceTable::startDiscardWorkLoop() throw (evf::Exception)
00319 {
00320 try {
00321 LOG4CPLUS_INFO(log_,"Start 'discard' workloop.");
00322 wlDiscard_=toolbox::task::getWorkLoopFactory()->getWorkLoop("Discard","waiting");
00323 if (!wlDiscard_->isActive()) wlDiscard_->activate();
00324 asDiscard_=toolbox::task::bind(this,&FUResourceTable::discard,"Discard");
00325 wlDiscard_->submit(asDiscard_);
00326 isActive_=true;
00327 }
00328 catch (xcept::Exception& e) {
00329 string msg = "Failed to start workloop 'Discard'.";
00330 XCEPT_RETHROW(evf::Exception,msg,e);
00331 }
00332 isReadyToShutDown_=false;
00333 }
00334
00335
00336
00337 bool FUResourceTable::discard(toolbox::task::WorkLoop* )
00338 {
00339 FUShmRawCell* cell =shmBuffer_->rawCellToDiscard();
00340 evt::State_t state=shmBuffer_->evtState(cell->index());
00341
00342 bool reschedule =true;
00343 bool shutDown =(state==evt::STOP);
00344 bool isLumi =(state==evt::USEDLS);
00345 UInt_t fuResourceId=cell->fuResourceId();
00346 UInt_t buResourceId=cell->buResourceId();
00347
00348
00349
00350
00351
00352 if (shutDown) {
00353 LOG4CPLUS_INFO(log_,"nbClientsToShutDown = "<<nbClientsToShutDown_);
00354 if (nbClientsToShutDown_>0) --nbClientsToShutDown_;
00355 if (nbClientsToShutDown_==0) {
00356 LOG4CPLUS_INFO(log_,"Don't reschedule discard-workloop.");
00357 isActive_ =false;
00358 reschedule=false;
00359 }
00360 }
00361
00362 shmBuffer_->discardRawCell(cell);
00363 if(isLumi) nbEolDiscarded_++;
00364
00365 if (!shutDown && !isLumi) {
00366 if(fuResourceId >= nbResources()){
00367 LOG4CPLUS_WARN(log_,"cell " << cell->index() << " in state " << state
00368 << " scheduled for discard has no associated FU resource ");
00369 }
00370 else{
00371 resources_[fuResourceId]->release();
00372 lock();
00373 freeResourceIds_.push(fuResourceId);
00374 assert(freeResourceIds_.size()<=resources_.size());
00375 unlock();
00376
00377 if (!isHalting_) {
00378 sendDiscard(buResourceId);
00379 if(!isStopping_)sendAllocate();
00380 }
00381 }
00382 }
00383
00384 if (!reschedule) {
00385 std::cout << " entered shutdown cycle " << std::endl;
00386 shmBuffer_->writeRecoEmptyEvent();
00387 UInt_t count=0;
00388 while (count<100) {
00389 std::cout << " shutdown cycle " <<shmBuffer_->nClients() << " "
00390 << FUShmBuffer::shm_nattch(shmBuffer_->shmid()) << std::endl;
00391 if (shmBuffer_->nClients()==0&&
00392 FUShmBuffer::shm_nattch(shmBuffer_->shmid())==1) {
00393
00394 break;
00395 }
00396 else {
00397 count++;
00398 std::cout << " shutdown cycle attempt " << count << std::endl;
00399 LOG4CPLUS_DEBUG(log_,"FUResourceTable: Wait for all clients to detach,"
00400 <<" nClients="<<shmBuffer_->nClients()
00401 <<" nattch="<<FUShmBuffer::shm_nattch(shmBuffer_->shmid())
00402 <<" ("<<count<<")");
00403 ::usleep(shutdownTimeout_);
00404 if(count*shutdownTimeout_ > 10000000)
00405 LOG4CPLUS_WARN(log_,"FUResourceTable:LONG Wait (>10s) for all clients to detach,"
00406 <<" nClients="<<shmBuffer_->nClients()
00407 <<" nattch="<<FUShmBuffer::shm_nattch(shmBuffer_->shmid())
00408 <<" ("<<count<<")");
00409
00410 }
00411 }
00412 bool allEmpty = false;
00413 std::cout << "Checking if all dqm cells are empty " << std::endl;
00414 while(!allEmpty){
00415 UInt_t n=nbDqmCells_;
00416 allEmpty = true;
00417 shmBuffer_->lock();
00418 for (UInt_t i=0;i<n;i++) {
00419 dqm::State_t state=shmBuffer_->dqmState(i);
00420 if(state!=dqm::EMPTY) allEmpty = false;
00421 }
00422 shmBuffer_->unlock();
00423 }
00424 std::cout << "Making sure there are no dqm pending discards " << std::endl;
00425 if(nbPendingSMDqmDiscards_ != 0)
00426 {
00427 LOG4CPLUS_WARN(log_,"FUResourceTable: pending DQM discards not zero: ="
00428 << nbPendingSMDqmDiscards_ << " while cells are all empty. This may cause problems at next start ");
00429
00430 }
00431 shmBuffer_->writeDqmEmptyEvent();
00432 isReadyToShutDown_ = true;
00433
00434 }
00435
00436 return reschedule;
00437 }
00438
00439
00440
00441
00442 UInt_t FUResourceTable::allocateResource()
00443 {
00444 assert(!freeResourceIds_.empty());
00445
00446 lock();
00447 UInt_t fuResourceId=freeResourceIds_.front();
00448 freeResourceIds_.pop();
00449 nbPending_++;
00450 nbAllocated_++;
00451 unlock();
00452
00453 return fuResourceId;
00454 }
00455
00456
00457
00458 bool FUResourceTable::buildResource(MemRef_t* bufRef)
00459 {
00460 bool eventComplete=false;
00461
00462 I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME *block=
00463 (I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME*)bufRef->getDataLocation();
00464
00465 UInt_t fuResourceId=(UInt_t)block->fuTransactionId;
00466 UInt_t buResourceId=(UInt_t)block->buResourceId;
00467 FUResource* resource =resources_[fuResourceId];
00468
00469
00470 if (!resource->fatalError()&&!resource->isAllocated()) {
00471 FUShmRawCell* cell=shmBuffer_->rawCellToWrite();
00472 resource->allocate(cell);
00473 timeval now;
00474 gettimeofday(&now,0);
00475
00476 frb_->setRBTimeStamp(((uint64_t)(now.tv_sec) << 32) + (uint64_t)(now.tv_usec));
00477
00478 frb_->setRBEventCount(nbCompleted_);
00479
00480 if (doCrcCheck_>0&&0==nbAllocated_%doCrcCheck_) resource->doCrcCheck(true);
00481 else resource->doCrcCheck(false);
00482 }
00483
00484
00485
00486 if (!resource->fatalError()) {
00487 resource->process(bufRef);
00488 lock();
00489 nbErrors_ +=resource->nbErrors();
00490 nbCrcErrors_+=resource->nbCrcErrors();
00491 unlock();
00492
00493
00494 if (resource->isComplete()) {
00495 lock();
00496 nbCompleted_++;
00497 nbPending_--;
00498 unlock();
00499 if (doDumpEvents_>0&&nbCompleted_%doDumpEvents_==0)
00500 dumpEvent(resource->shmCell());
00501 shmBuffer_->finishWritingRawCell(resource->shmCell());
00502 eventComplete=true;
00503 }
00504
00505 }
00506
00507
00508 if (resource->fatalError()) {
00509 bool lastMsg=isLastMessageOfEvent(bufRef);
00510 if (lastMsg) {
00511 shmBuffer_->releaseRawCell(resource->shmCell());
00512 resource->release();
00513 lock();
00514 freeResourceIds_.push(fuResourceId);
00515 nbDiscarded_++;
00516 nbLost_++;
00517 nbPending_--;
00518 unlock();
00519 bu_->sendDiscard(buResourceId);
00520 sendAllocate();
00521 }
00522 bufRef->release();
00523 }
00524
00525 return eventComplete;
00526 }
00527
00528
00529
00530 bool FUResourceTable::discardDataEvent(MemRef_t* bufRef)
00531 {
00532 I2O_FU_DATA_DISCARD_MESSAGE_FRAME *msg;
00533 msg=(I2O_FU_DATA_DISCARD_MESSAGE_FRAME*)bufRef->getDataLocation();
00534 UInt_t recoIndex=msg->rbBufferID;
00535
00536 if (acceptSMDataDiscard_[recoIndex]) {
00537 lock();
00538 nbPendingSMDiscards_--;
00539 unlock();
00540 acceptSMDataDiscard_[recoIndex] = false;
00541
00542 if (!isHalting_) {
00543 shmBuffer_->discardRecoCell(recoIndex);
00544 bufRef->release();
00545 }
00546 }
00547 else {
00548 LOG4CPLUS_ERROR(log_,"Spurious DATA discard by StorageManager, skip!");
00549 }
00550
00551 if (isHalting_) {
00552 bufRef->release();
00553 return false;
00554 }
00555
00556 return true;
00557 }
00558
00559
00560
00561 bool FUResourceTable::discardDqmEvent(MemRef_t* bufRef)
00562 {
00563 I2O_FU_DQM_DISCARD_MESSAGE_FRAME *msg;
00564 msg=(I2O_FU_DQM_DISCARD_MESSAGE_FRAME*)bufRef->getDataLocation();
00565 UInt_t dqmIndex=msg->rbBufferID;
00566 unsigned int ntries = 0;
00567 while(shmBuffer_->dqmState(dqmIndex)!=dqm::SENT){
00568 LOG4CPLUS_WARN(log_,"DQM discard for cell "<< dqmIndex << " which is not yer in SENT state - waiting");
00569 ::usleep(10000);
00570 if(ntries++>10){
00571 LOG4CPLUS_ERROR(log_,"DQM cell " << dqmIndex
00572 << " discard timed out while cell still in state " << shmBuffer_->dqmState(dqmIndex) );
00573 bufRef->release();
00574 return true;
00575 }
00576 }
00577 if (acceptSMDqmDiscard_[dqmIndex]>0) {
00578 acceptSMDqmDiscard_[dqmIndex]--;
00579 if(nbPendingSMDqmDiscards_>0){
00580 nbPendingSMDqmDiscards_--;
00581 }
00582 else {
00583 LOG4CPLUS_WARN(log_,"Spurious??? DQM discard by StorageManager, index " << dqmIndex
00584 << " cell state " << shmBuffer_->dqmState(dqmIndex) << " accept flag " << acceptSMDqmDiscard_[dqmIndex];);
00585 }
00586
00587 if (!isHalting_) {
00588 shmBuffer_->discardDqmCell(dqmIndex);
00589 bufRef->release();
00590 }
00591
00592 }
00593 else {
00594 LOG4CPLUS_ERROR(log_,"Spurious DQM discard for cell " << dqmIndex
00595 << " from StorageManager while cell is not accepting discards");
00596 }
00597
00598 if (isHalting_) {
00599 bufRef->release();
00600 return false;
00601 }
00602
00603 return true;
00604 }
00605
00606
00607
00608 void FUResourceTable::postEndOfLumiSection(MemRef_t* bufRef)
00609 {
00610 I2O_EVM_END_OF_LUMISECTION_MESSAGE_FRAME *msg =
00611 (I2O_EVM_END_OF_LUMISECTION_MESSAGE_FRAME *)bufRef->getDataLocation();
00612
00613
00614
00615 for(unsigned int i = 0; i < nbRawCells_; i++)
00616 {
00617 nbEolPosted_++;
00618 shmBuffer_->writeRawLumiSectionEvent(msg->lumiSection);
00619 }
00620 }
00621
00622
00623
00624 void FUResourceTable::dropEvent()
00625 {
00626 FUShmRawCell* cell=shmBuffer_->rawCellToRead();
00627 UInt_t fuResourceId=cell->fuResourceId();
00628 shmBuffer_->finishReadingRawCell(cell);
00629 shmBuffer_->scheduleRawCellForDiscard(fuResourceId);
00630 }
00631
00632
00633
00634 bool FUResourceTable::handleCrashedEP(UInt_t runNumber,pid_t pid)
00635 {
00636 bool retval = false;
00637 vector<pid_t> pids=cellPrcIds();
00638 UInt_t iRawCell=pids.size();
00639 for (UInt_t i=0;i<pids.size();i++) { if (pid==pids[i]) { iRawCell=i; break; } }
00640
00641 if (iRawCell<pids.size()){
00642 shmBuffer_->writeErrorEventData(runNumber,pid,iRawCell);
00643 retval = true;
00644 }
00645 else
00646 LOG4CPLUS_WARN(log_,"No raw data to send to error stream for process " << pid);
00647 shmBuffer_->removeClientPrcId(pid);
00648 return retval;
00649 }
00650
00651
00652
00653 void FUResourceTable::dumpEvent(FUShmRawCell* cell)
00654 {
00655 ostringstream oss; oss<<"/tmp/evt"<<cell->evtNumber()<<".dump";
00656 ofstream fout(oss.str().c_str());
00657 fout.fill('0');
00658
00659 fout<<"#\n# evt "<<cell->evtNumber()<<"\n#\n"<<endl;
00660 for (unsigned int i=0;i<cell->nFed();i++) {
00661 if (cell->fedSize(i)==0) continue;
00662 fout<<"# fedid "<<i<<endl;
00663 unsigned char* addr=cell->fedAddr(i);
00664 for (unsigned int j=0;j<cell->fedSize(i);j++) {
00665 fout<<setiosflags(ios::right)<<setw(2)<<hex<<(int)(*addr)<<dec;
00666 if ((j+1)%8) fout<<" "; else fout<<endl;
00667 ++addr;
00668 }
00669 fout<<endl;
00670 }
00671 fout.close();
00672 }
00673
00674
00675
00676 void FUResourceTable::stop()
00677 {
00678 isStopping_ = true;
00679 shutDownClients();
00680 }
00681
00682
00683
00684 void FUResourceTable::halt()
00685 {
00686 isHalting_=true;
00687 shutDownClients();
00688 }
00689
00690
00691
00692 void FUResourceTable::shutDownClients()
00693 {
00694 nbClientsToShutDown_ = nbClients();
00695 isReadyToShutDown_ = false;
00696
00697 if (nbClientsToShutDown_==0) {
00698 LOG4CPLUS_INFO(log_,"No clients to shut down. Checking if there are raw cells not assigned to any process yet");
00699 UInt_t n=nbResources();
00700 for (UInt_t i=0;i<n;i++) {
00701 evt::State_t state=shmBuffer_->evtState(i);
00702 if (state!=evt::EMPTY){
00703 LOG4CPLUS_WARN(log_,"Schedule discard at STOP for orphaned event in state "
00704 << state);
00705 shmBuffer_->scheduleRawCellForDiscardServerSide(i);
00706 }
00707 }
00708 shmBuffer_->scheduleRawEmptyCellForDiscard();
00709 }
00710 else {
00711 UInt_t n=nbClientsToShutDown_;
00712 for (UInt_t i=0;i<n;++i) shmBuffer_->writeRawEmptyEvent();
00713 }
00714 }
00715
00716
00717
00718 void FUResourceTable::clear()
00719 {
00720 for (UInt_t i=0;i<resources_.size();i++) {
00721 resources_[i]->release();
00722 delete resources_[i];
00723 }
00724 resources_.clear();
00725 while (!freeResourceIds_.empty()) freeResourceIds_.pop();
00726 }
00727
00728
00729
00730 void FUResourceTable::resetCounters()
00731 {
00732 if (0!=shmBuffer_) {
00733 for (UInt_t i=0;i<shmBuffer_->nRecoCells();i++) acceptSMDataDiscard_[i]= false;
00734 for (UInt_t i=0;i<shmBuffer_->nDqmCells();i++) acceptSMDqmDiscard_[i] = 0;
00735 }
00736
00737 nbAllocated_ =nbPending_;
00738 nbCompleted_ =0;
00739 nbSent_ =0;
00740 nbSentError_ =0;
00741 nbSentDqm_ =0;
00742 nbPendingSMDiscards_ =0;
00743 nbPendingSMDqmDiscards_=0;
00744 nbDiscarded_ =0;
00745 nbLost_ =0;
00746 nbEolPosted_ =0;
00747 nbEolDiscarded_ =0;
00748
00749 nbErrors_ =0;
00750 nbCrcErrors_ =0;
00751 nbAllocSent_ =0;
00752
00753 sumOfSquares_ =0;
00754 sumOfSizes_ =0;
00755 isStopping_ =false;
00756 }
00757
00758
00759
00760 UInt_t FUResourceTable::nbClients() const
00761 {
00762 UInt_t result(0);
00763 if (0!=shmBuffer_) result=shmBuffer_->nClients();
00764 return result;
00765 }
00766
00767
00768
00769 vector<pid_t> FUResourceTable::clientPrcIds() const
00770 {
00771 vector<pid_t> result;
00772 if (0!=shmBuffer_) {
00773 UInt_t n=nbClients();
00774 for (UInt_t i=0;i<n;i++) result.push_back(shmBuffer_->clientPrcId(i));
00775 }
00776 return result;
00777 }
00778
00779
00780
00781 string FUResourceTable::clientPrcIdsAsString() const
00782 {
00783 stringstream ss;
00784 if (0!=shmBuffer_) {
00785 UInt_t n=nbClients();
00786 for (UInt_t i=0;i<n;i++) {
00787 if (i>0) ss<<",";
00788 ss<<shmBuffer_->clientPrcId(i);
00789 }
00790 }
00791 return ss.str();
00792 }
00793
00794
00795
00796 vector<string> FUResourceTable::cellStates() const
00797 {
00798 vector<string> result;
00799 if (0!=shmBuffer_) {
00800 UInt_t n=nbResources();
00801 shmBuffer_->lock();
00802 for (UInt_t i=0;i<n;i++) {
00803 evt::State_t state=shmBuffer_->evtState(i);
00804 if (state==evt::EMPTY) result.push_back("EMPTY");
00805 else if (state==evt::STOP) result.push_back("STOP");
00806 else if (state==evt::LUMISECTION)result.push_back("LUMISECTION");
00807 else if (state==evt::USEDLS) result.push_back("USEDLS");
00808 else if (state==evt::RAWWRITING) result.push_back("RAWWRITING");
00809 else if (state==evt::RAWWRITTEN) result.push_back("RAWWRITTEN");
00810 else if (state==evt::RAWREADING) result.push_back("RAWREADING");
00811 else if (state==evt::RAWREAD) result.push_back("RAWREAD");
00812 else if (state==evt::PROCESSING) result.push_back("PROCESSING");
00813 else if (state==evt::PROCESSED) result.push_back("PROCESSED");
00814 else if (state==evt::RECOWRITING)result.push_back("RECOWRITING");
00815 else if (state==evt::RECOWRITTEN)result.push_back("RECOWRITTEN");
00816 else if (state==evt::SENDING) result.push_back("SENDING");
00817 else if (state==evt::SENT) result.push_back("SENT");
00818 else if (state==evt::DISCARDING) result.push_back("DISCARDING");
00819 }
00820 shmBuffer_->unlock();
00821 }
00822 return result;
00823 }
00824
00825 vector<string> FUResourceTable::dqmCellStates() const
00826 {
00827 vector<string> result;
00828 if (0!=shmBuffer_) {
00829 UInt_t n=nbDqmCells_;
00830 shmBuffer_->lock();
00831 for (UInt_t i=0;i<n;i++) {
00832 dqm::State_t state=shmBuffer_->dqmState(i);
00833 if (state==dqm::EMPTY) result.push_back("EMPTY");
00834 else if (state==dqm::WRITING) result.push_back("WRITING");
00835 else if (state==dqm::WRITTEN) result.push_back("WRITTEN");
00836 else if (state==dqm::SENDING) result.push_back("SENDING");
00837 else if (state==dqm::SENT) result.push_back("SENT");
00838 else if (state==dqm::DISCARDING) result.push_back("DISCARDING");
00839 }
00840 shmBuffer_->unlock();
00841 }
00842 return result;
00843 }
00844
00845
00846
00847 vector<UInt_t> FUResourceTable::cellEvtNumbers() const
00848 {
00849 vector<UInt_t> result;
00850 if (0!=shmBuffer_) {
00851 UInt_t n=nbResources();
00852 shmBuffer_->lock();
00853 for (UInt_t i=0;i<n;i++) result.push_back(shmBuffer_->evtNumber(i));
00854 shmBuffer_->unlock();
00855 }
00856 return result;
00857 }
00858
00859
00860
00861 vector<pid_t> FUResourceTable::cellPrcIds() const
00862 {
00863 vector<pid_t> result;
00864 if (0!=shmBuffer_) {
00865 UInt_t n=nbResources();
00866 shmBuffer_->lock();
00867 for (UInt_t i=0;i<n;i++) result.push_back(shmBuffer_->evtPrcId(i));
00868 shmBuffer_->unlock();
00869 }
00870 return result;
00871 }
00872
00873
00874
00875 vector<time_t> FUResourceTable::cellTimeStamps() const
00876 {
00877 vector<time_t> result;
00878 if (0!=shmBuffer_) {
00879 UInt_t n=nbResources();
00880 shmBuffer_->lock();
00881 for (UInt_t i=0;i<n;i++) result.push_back(shmBuffer_->evtTimeStamp(i));
00882 shmBuffer_->unlock();
00883 }
00884 return result;
00885 }
00886
00887
00889
00891
00892
00893 void FUResourceTable::sendAllocate()
00894 {
00895 UInt_t nbFreeSlots = this->nbFreeSlots();
00896 UInt_t nbFreeSlotsMax = resources_.size()/2;
00897 if (nbFreeSlots>nbFreeSlotsMax) {
00898 UIntVec_t fuResourceIds;
00899 for (UInt_t i=0;i<nbFreeSlots;i++)
00900 fuResourceIds.push_back(allocateResource());
00901 bu_->sendAllocate(fuResourceIds);
00902 nbAllocSent_++;
00903 }
00904 }
00905
00906
00907
00908 void FUResourceTable::sendDiscard(UInt_t buResourceId)
00909 {
00910 bu_->sendDiscard(buResourceId);
00911 nbDiscarded_++;
00912 }
00913
00914
00915
00916 void FUResourceTable::sendInitMessage(UInt_t fuResourceId,
00917 UInt_t outModId,
00918 UInt_t fuProcessId,
00919 UInt_t fuGuid,
00920 UChar_t *data,
00921 UInt_t dataSize)
00922 {
00923 if (0==sm_) {
00924 LOG4CPLUS_ERROR(log_,"No StorageManager, DROP INIT MESSAGE!");
00925 }
00926 else {
00927 acceptSMDataDiscard_[fuResourceId] = true;
00928 UInt_t nbBytes=sm_->sendInitMessage(fuResourceId,outModId,fuProcessId,
00929 fuGuid,data,dataSize);
00930 sumOfSquares_+=(uint64_t)nbBytes*(uint64_t)nbBytes;
00931 sumOfSizes_ +=nbBytes;
00932 }
00933 }
00934
00935
00936
00937 void FUResourceTable::sendDataEvent(UInt_t fuResourceId,
00938 UInt_t runNumber,
00939 UInt_t evtNumber,
00940 UInt_t outModId,
00941 UInt_t fuProcessId,
00942 UInt_t fuGuid,
00943 UChar_t *data,
00944 UInt_t dataSize)
00945 {
00946 if (0==sm_) {
00947 LOG4CPLUS_ERROR(log_,"No StorageManager, DROP DATA EVENT!");
00948 }
00949 else {
00950 acceptSMDataDiscard_[fuResourceId] = true;
00951 UInt_t nbBytes=sm_->sendDataEvent(fuResourceId,runNumber,evtNumber,
00952 outModId,fuProcessId,fuGuid,
00953 data,dataSize);
00954 sumOfSquares_+=(uint64_t)nbBytes*(uint64_t)nbBytes;
00955 sumOfSizes_ +=nbBytes;
00956 }
00957 }
00958
00959
00960
00961 void FUResourceTable::sendErrorEvent(UInt_t fuResourceId,
00962 UInt_t runNumber,
00963 UInt_t evtNumber,
00964 UInt_t fuProcessId,
00965 UInt_t fuGuid,
00966 UChar_t *data,
00967 UInt_t dataSize)
00968 {
00969 if (0==sm_) {
00970 LOG4CPLUS_ERROR(log_,"No StorageManager, DROP ERROR EVENT!");
00971 }
00972 else {
00973 acceptSMDataDiscard_[fuResourceId] = true;
00974 UInt_t nbBytes=sm_->sendErrorEvent(fuResourceId,runNumber,evtNumber,
00975 fuProcessId,fuGuid,data,dataSize);
00976 sumOfSquares_+=(uint64_t)nbBytes*(uint64_t)nbBytes;
00977 sumOfSizes_ +=nbBytes;
00978 }
00979
00980
00981
00982
00983
00984
00985
00986
00987
00988
00989
00990
00991
00992
00993
00994
00995
00996
00997
00998
00999
01000
01001
01002 }
01003
01004
01005
01006 void FUResourceTable::sendDqmEvent(UInt_t fuDqmId,
01007 UInt_t runNumber,
01008 UInt_t evtAtUpdate,
01009 UInt_t folderId,
01010 UInt_t fuProcessId,
01011 UInt_t fuGuid,
01012 UChar_t* data,
01013 UInt_t dataSize)
01014 {
01015 if (0==sm_) {
01016 LOG4CPLUS_WARN(log_,"No StorageManager, DROP DQM EVENT.");
01017 }
01018 else {
01019 sm_->sendDqmEvent(fuDqmId,runNumber,evtAtUpdate,folderId,
01020 fuProcessId,fuGuid,data,dataSize);
01021
01022 nbPendingSMDqmDiscards_++;
01023
01024 acceptSMDqmDiscard_[fuDqmId]++;
01025 if(acceptSMDqmDiscard_[fuDqmId]>1)
01026 LOG4CPLUS_WARN(log_,"DQM Cell " << fuDqmId << " being sent more than once for folder "
01027 << folderId << " process " << fuProcessId << " guid " << fuGuid);
01028 nbSentDqm_++;
01029 }
01030 }
01031
01032
01033
01034 bool FUResourceTable::isLastMessageOfEvent(MemRef_t* bufRef)
01035 {
01036 while (0!=bufRef->getNextReference()) bufRef=bufRef->getNextReference();
01037
01038 I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME *block=
01039 (I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME*)bufRef->getDataLocation();
01040
01041 UInt_t iBlock =block->blockNb;
01042 UInt_t nBlock =block->nbBlocksInSuperFragment;
01043 UInt_t iSuperFrag=block->superFragmentNb;
01044 UInt_t nSuperFrag=block->nbSuperFragmentsInEvent;
01045
01046 return ((iSuperFrag==nSuperFrag-1)&&(iBlock==nBlock-1));
01047 }
01048
01049
01050 void FUResourceTable::injectCRCError()
01051 {
01052 for (UInt_t i=0;i<resources_.size();i++) {
01053 resources_[i]->scheduleCRCError();
01054 }
01055 }
01056 void FUResourceTable::printWorkLoopStatus()
01057 {
01058 std::cout << "Workloop status===============" << std::endl;
01059 std::cout << "==============================" << std::endl;
01060 if(wlSendData_!=0)
01061 std::cout << "SendData -> " << wlSendData_->isActive() << std::endl;
01062 if(wlSendDqm_!=0)
01063 std::cout << "SendDqm -> " << wlSendDqm_->isActive() << std::endl;
01064 if(wlDiscard_!=0)
01065 std::cout << "Discard -> " << wlDiscard_->isActive() << std::endl;
01066 std::cout << "Workloops Active -> " << isActive_ << std::endl;
01067
01068 }
01069
01070
01071 void FUResourceTable::lastResort()
01072 {
01073 std::cout << "lastResort: " << shmBuffer_->nbRawCellsToRead()
01074 << " more rawcells to read " << std::endl;
01075 while(shmBuffer_->nbRawCellsToRead()!=0){
01076 FUShmRawCell* newCell=shmBuffer_->rawCellToRead();
01077 std::cout << "lastResort: " << shmBuffer_->nbRawCellsToRead() << std::endl;
01078 shmBuffer_->scheduleRawCellForDiscardServerSide(newCell->index());
01079 std::cout << "lastResort: schedule raw cell for discard" << std::endl;
01080 }
01081 }