00001
00002
00003
00004
00005
00006
00008
00009
00010 #include "EventFilter/ResourceBroker/interface/FUResourceTable.h"
00011 #include "FWCore/Utilities/interface/CRC16.h"
00012 #include "EvffedFillerRB.h"
00013
00014 #include "toolbox/task/WorkLoopFactory.h"
00015 #include "interface/evb/i2oEVBMsgs.h"
00016 #include "xcept/tools.h"
00017
00018
00019 #include <fstream>
00020 #include <sstream>
00021 #include <iomanip>
00022 #include <unistd.h>
00023
00024 #include <sys/types.h>
00025 #include <signal.h>
00026
00027 using namespace evf;
00028 using namespace std;
00029
00030
00032
00034
00035
00036 FUResourceTable::FUResourceTable(bool segmentationMode,
00037 UInt_t nbRawCells,
00038 UInt_t nbRecoCells,
00039 UInt_t nbDqmCells,
00040 UInt_t rawCellSize,
00041 UInt_t recoCellSize,
00042 UInt_t dqmCellSize,
00043 BUProxy *bu,
00044 SMProxy *sm,
00045 log4cplus::Logger logger,
00046 unsigned int timeout,
00047 EvffedFillerRB *frb,
00048 xdaq::Application*app)
00049 throw (evf::Exception)
00050 : bu_(bu)
00051 , sm_(sm)
00052 , log_(logger)
00053 , wlSendData_(0)
00054 , asSendData_(0)
00055 , wlSendDqm_(0)
00056 , asSendDqm_(0)
00057 , wlDiscard_(0)
00058 , asDiscard_(0)
00059 , shmBuffer_(0)
00060 , nbDqmCells_(nbDqmCells)
00061 , nbRawCells_(nbRawCells)
00062 , nbRecoCells_(nbRecoCells)
00063 , acceptSMDataDiscard_(0)
00064 , acceptSMDqmDiscard_(0)
00065 , doCrcCheck_(1)
00066 , shutdownTimeout_(timeout)
00067 , nbPending_(0)
00068 , nbClientsToShutDown_(0)
00069 , isReadyToShutDown_(true)
00070 , isActive_(false)
00071 , isHalting_(false)
00072 , isStopping_(false)
00073 , runNumber_(0xffffffff)
00074 , lock_(toolbox::BSem::FULL)
00075 , frb_(frb)
00076 , app_(app)
00077 {
00078 initialize(segmentationMode,
00079 nbRawCells,nbRecoCells,nbDqmCells,
00080 rawCellSize,recoCellSize,dqmCellSize);
00081 }
00082
00083
00084
00085 FUResourceTable::~FUResourceTable()
00086 {
00087 clear();
00088 if(wlSendData_){
00089 wlSendData_->cancel();
00090 toolbox::task::getWorkLoopFactory()->removeWorkLoop("SendData","waiting");
00091 }
00092 if(wlSendDqm_){
00093 wlSendDqm_->cancel();
00094 toolbox::task::getWorkLoopFactory()->removeWorkLoop("SendDqm","waiting");
00095 }
00096 if(wlDiscard_){
00097 wlDiscard_->cancel();
00098 toolbox::task::getWorkLoopFactory()->removeWorkLoop("Discard","waiting");
00099 }
00100 shmdt(shmBuffer_);
00101 if (FUShmBuffer::releaseSharedMemory())
00102 LOG4CPLUS_INFO(log_,"SHARED MEMORY SUCCESSFULLY RELEASED.");
00103 if (0!=acceptSMDataDiscard_) delete [] acceptSMDataDiscard_;
00104 if (0!= acceptSMDqmDiscard_) delete [] acceptSMDqmDiscard_;
00105 }
00106
00107
00109
00111
00112
00113 void FUResourceTable::initialize(bool segmentationMode,
00114 UInt_t nbRawCells,
00115 UInt_t nbRecoCells,
00116 UInt_t nbDqmCells,
00117 UInt_t rawCellSize,
00118 UInt_t recoCellSize,
00119 UInt_t dqmCellSize)
00120 throw (evf::Exception)
00121 {
00122 clear();
00123
00124 shmBuffer_=FUShmBuffer::createShmBuffer(segmentationMode,
00125 nbRawCells,nbRecoCells,nbDqmCells,
00126 rawCellSize,recoCellSize,dqmCellSize);
00127 if (0==shmBuffer_) {
00128 string msg = "CREATION OF SHARED MEMORY SEGMENT FAILED!";
00129 LOG4CPLUS_FATAL(log_,msg);
00130 XCEPT_RAISE(evf::Exception,msg);
00131 }
00132
00133 for (UInt_t i=0;i<nbRawCells_;i++) {
00134 resources_.push_back(new FUResource(i,log_,frb_,app_));
00135 freeResourceIds_.push(i);
00136 }
00137
00138 acceptSMDataDiscard_ = new bool[nbRecoCells];
00139 acceptSMDqmDiscard_ = new int[nbDqmCells];
00140
00141 resetCounters();
00142 }
00143
00144
00145
00146 void FUResourceTable::startSendDataWorkLoop() throw (evf::Exception)
00147 {
00148 try {
00149 wlSendData_=
00150 toolbox::task::getWorkLoopFactory()->getWorkLoop("SendData","waiting");
00151 if (!wlSendData_->isActive()) wlSendData_->activate();
00152 asSendData_=toolbox::task::bind(this,&FUResourceTable::sendData,"SendData");
00153 wlSendData_->submit(asSendData_);
00154 }
00155 catch (xcept::Exception& e) {
00156 string msg = "Failed to start workloop 'SendData'.";
00157 XCEPT_RETHROW(evf::Exception,msg,e);
00158 }
00159 }
00160
00161
00162
00163 bool FUResourceTable::sendData(toolbox::task::WorkLoop* )
00164 {
00165 bool reschedule=true;
00166
00167 FUShmRecoCell* cell=shmBuffer_->recoCellToRead();
00168
00169 if (0==cell->eventSize()) {
00170 LOG4CPLUS_INFO(log_,"Don't reschedule sendData workloop.");
00171 UInt_t cellIndex=cell->index();
00172 shmBuffer_->finishReadingRecoCell(cell);
00173 shmBuffer_->discardRecoCell(cellIndex);
00174 reschedule=false;
00175 }
00176 else if (isHalting_) {
00177 LOG4CPLUS_INFO(log_,"sendData: isHalting, discard recoCell.");
00178 UInt_t cellIndex=cell->index();
00179 shmBuffer_->finishReadingRecoCell(cell);
00180 shmBuffer_->discardRecoCell(cellIndex);
00181 }
00182 else {
00183 try {
00184 if (cell->type()==0) {
00185 UInt_t cellIndex = cell->index();
00186 UInt_t cellOutModId = cell->outModId();
00187 UInt_t cellFUProcId = cell->fuProcessId();
00188 UInt_t cellFUGuid = cell->fuGuid();
00189 UChar_t* cellPayloadAddr = cell->payloadAddr();
00190 UInt_t cellEventSize = cell->eventSize();
00191 shmBuffer_->finishReadingRecoCell(cell);
00192
00193 lock();
00194 nbPendingSMDiscards_++;
00195 unlock();
00196
00197 sendInitMessage(cellIndex,cellOutModId,cellFUProcId,cellFUGuid,
00198 cellPayloadAddr,cellEventSize);
00199 }
00200 else if (cell->type()==1) {
00201 UInt_t cellIndex = cell->index();
00202 UInt_t cellRawIndex = cell->rawCellIndex();
00203 UInt_t cellRunNumber = cell->runNumber();
00204 UInt_t cellEvtNumber = cell->evtNumber();
00205 UInt_t cellOutModId = cell->outModId();
00206 UInt_t cellFUProcId = cell->fuProcessId();
00207 UInt_t cellFUGuid = cell->fuGuid();
00208 UChar_t *cellPayloadAddr = cell->payloadAddr();
00209 UInt_t cellEventSize = cell->eventSize();
00210 shmBuffer_->finishReadingRecoCell(cell);
00211
00212 lock();
00213 nbPendingSMDiscards_++;
00214 resources_[cellRawIndex]->incNbSent();
00215 if (resources_[cellRawIndex]->nbSent()==1) nbSent_++;
00216 unlock();
00217
00218 sendDataEvent(cellIndex,cellRunNumber,cellEvtNumber,cellOutModId,
00219 cellFUProcId,cellFUGuid,cellPayloadAddr,cellEventSize);
00220 }
00221 else if (cell->type()==2) {
00222 UInt_t cellIndex = cell->index();
00223 UInt_t cellRawIndex = cell->rawCellIndex();
00224
00225 UInt_t cellEvtNumber = cell->evtNumber();
00226 UInt_t cellFUProcId = cell->fuProcessId();
00227 UInt_t cellFUGuid = cell->fuGuid();
00228 UChar_t *cellPayloadAddr = cell->payloadAddr();
00229 UInt_t cellEventSize = cell->eventSize();
00230 shmBuffer_->finishReadingRecoCell(cell);
00231
00232 lock();
00233 nbPendingSMDiscards_++;
00234 resources_[cellRawIndex]->incNbSent();
00235 if (resources_[cellRawIndex]->nbSent()==1) { nbSent_++; nbSentError_++; }
00236 unlock();
00237
00238 sendErrorEvent(cellIndex,runNumber_,cellEvtNumber,
00239 cellFUProcId,cellFUGuid,cellPayloadAddr,cellEventSize);
00240 }
00241 else {
00242 string errmsg="Unknown RecoCell type (neither INIT/DATA/ERROR).";
00243 XCEPT_RAISE(evf::Exception,errmsg);
00244 }
00245 }
00246 catch (xcept::Exception& e) {
00247 LOG4CPLUS_FATAL(log_,"Failed to send EVENT DATA to StorageManager: "
00248 <<xcept::stdformat_exception_history(e));
00249 reschedule=false;
00250 }
00251 }
00252
00253 return reschedule;
00254 }
00255
00256
00257
00258 void FUResourceTable::startSendDqmWorkLoop() throw (evf::Exception)
00259 {
00260 try {
00261 wlSendDqm_=toolbox::task::getWorkLoopFactory()->getWorkLoop("SendDqm","waiting");
00262 if (!wlSendDqm_->isActive()) wlSendDqm_->activate();
00263 asSendDqm_=toolbox::task::bind(this,&FUResourceTable::sendDqm,"SendDqm");
00264 wlSendDqm_->submit(asSendDqm_);
00265 }
00266 catch (xcept::Exception& e) {
00267 string msg = "Failed to start workloop 'SendDqm'.";
00268 XCEPT_RETHROW(evf::Exception,msg,e);
00269 }
00270 }
00271
00272
00273
00274 bool FUResourceTable::sendDqm(toolbox::task::WorkLoop* )
00275 {
00276 bool reschedule=true;
00277
00278 FUShmDqmCell* cell=shmBuffer_->dqmCellToRead();
00279 dqm::State_t state=shmBuffer_->dqmState(cell->index());
00280
00281 if (state==dqm::EMPTY) {
00282 LOG4CPLUS_WARN(log_,"Don't reschedule sendDqm workloop.");
00283 std::cout << "shut down dqm workloop " << std::endl;
00284 UInt_t cellIndex=cell->index();
00285 shmBuffer_->finishReadingDqmCell(cell);
00286 shmBuffer_->discardDqmCell(cellIndex);
00287 reschedule=false;
00288 }
00289 else if (isHalting_) {
00290 UInt_t cellIndex=cell->index();
00291 shmBuffer_->finishReadingDqmCell(cell);
00292 shmBuffer_->discardDqmCell(cellIndex);
00293 }
00294 else {
00295 try {
00296 UInt_t cellIndex = cell->index();
00297 UInt_t cellRunNumber = cell->runNumber();
00298 UInt_t cellEvtAtUpdate = cell->evtAtUpdate();
00299 UInt_t cellFolderId = cell->folderId();
00300 UInt_t cellFUProcId = cell->fuProcessId();
00301 UInt_t cellFUGuid = cell->fuGuid();
00302 UChar_t *cellPayloadAddr = cell->payloadAddr();
00303 UInt_t cellEventSize = cell->eventSize();
00304 sendDqmEvent(cellIndex,cellRunNumber,cellEvtAtUpdate,cellFolderId,
00305 cellFUProcId,cellFUGuid,cellPayloadAddr,cellEventSize);
00306 shmBuffer_->finishReadingDqmCell(cell);
00307 }
00308 catch (xcept::Exception& e) {
00309 LOG4CPLUS_FATAL(log_,"Failed to send DQM DATA to StorageManager: "
00310 <<xcept::stdformat_exception_history(e));
00311 reschedule=false;
00312 }
00313 }
00314
00315 return reschedule;
00316 }
00317
00318
00319
00320 void FUResourceTable::startDiscardWorkLoop() throw (evf::Exception)
00321 {
00322 try {
00323 LOG4CPLUS_INFO(log_,"Start 'discard' workloop.");
00324 wlDiscard_=toolbox::task::getWorkLoopFactory()->getWorkLoop("Discard","waiting");
00325 if (!wlDiscard_->isActive()) wlDiscard_->activate();
00326 asDiscard_=toolbox::task::bind(this,&FUResourceTable::discard,"Discard");
00327 wlDiscard_->submit(asDiscard_);
00328 isActive_=true;
00329 }
00330 catch (xcept::Exception& e) {
00331 string msg = "Failed to start workloop 'Discard'.";
00332 XCEPT_RETHROW(evf::Exception,msg,e);
00333 }
00334 isReadyToShutDown_=false;
00335 }
00336
00337
00338
00339 bool FUResourceTable::discard(toolbox::task::WorkLoop* )
00340 {
00341 FUShmRawCell* cell =shmBuffer_->rawCellToDiscard();
00342 evt::State_t state=shmBuffer_->evtState(cell->index());
00343
00344 bool reschedule =true;
00345 bool shutDown =(state==evt::STOP);
00346 bool isLumi =(state==evt::USEDLS);
00347 UInt_t fuResourceId=cell->fuResourceId();
00348 UInt_t buResourceId=cell->buResourceId();
00349
00350
00351
00352
00353
00354 if (shutDown) {
00355 LOG4CPLUS_INFO(log_,"nbClientsToShutDown = "<<nbClientsToShutDown_);
00356 if (nbClientsToShutDown_>0) --nbClientsToShutDown_;
00357 if (nbClientsToShutDown_==0) {
00358 LOG4CPLUS_INFO(log_,"Don't reschedule discard-workloop.");
00359 isActive_ =false;
00360 reschedule=false;
00361 }
00362 }
00363
00364 shmBuffer_->discardRawCell(cell);
00365 if(isLumi) nbEolDiscarded_++;
00366
00367 if (!shutDown && !isLumi) {
00368 if(fuResourceId >= nbResources()){
00369 LOG4CPLUS_WARN(log_,"cell " << cell->index() << " in state " << state
00370 << " scheduled for discard has no associated FU resource ");
00371 }
00372 else{
00373 resources_[fuResourceId]->release();
00374 lock();
00375 freeResourceIds_.push(fuResourceId);
00376 assert(freeResourceIds_.size()<=resources_.size());
00377 unlock();
00378
00379 if (!isHalting_) {
00380 sendDiscard(buResourceId);
00381 if(!isStopping_)sendAllocate();
00382 }
00383 }
00384 }
00385
00386 if (!reschedule) {
00387 std::cout << " entered shutdown cycle " << std::endl;
00388 shmBuffer_->writeRecoEmptyEvent();
00389 UInt_t count=0;
00390 while (count<100) {
00391 std::cout << " shutdown cycle " <<shmBuffer_->nClients() << " "
00392 << FUShmBuffer::shm_nattch(shmBuffer_->shmid()) << std::endl;
00393 if (shmBuffer_->nClients()==0&&
00394 FUShmBuffer::shm_nattch(shmBuffer_->shmid())==1) {
00395
00396 break;
00397 }
00398 else {
00399 count++;
00400 std::cout << " shutdown cycle attempt " << count << std::endl;
00401 LOG4CPLUS_DEBUG(log_,"FUResourceTable: Wait for all clients to detach,"
00402 <<" nClients="<<shmBuffer_->nClients()
00403 <<" nattch="<<FUShmBuffer::shm_nattch(shmBuffer_->shmid())
00404 <<" ("<<count<<")");
00405 ::usleep(shutdownTimeout_);
00406 if(count*shutdownTimeout_ > 10000000)
00407 LOG4CPLUS_WARN(log_,"FUResourceTable:LONG Wait (>10s) for all clients to detach,"
00408 <<" nClients="<<shmBuffer_->nClients()
00409 <<" nattch="<<FUShmBuffer::shm_nattch(shmBuffer_->shmid())
00410 <<" ("<<count<<")");
00411
00412 }
00413 }
00414 bool allEmpty = false;
00415 std::cout << "Checking if all dqm cells are empty " << std::endl;
00416 while(!allEmpty){
00417 UInt_t n=nbDqmCells_;
00418 allEmpty = true;
00419 shmBuffer_->lock();
00420 for (UInt_t i=0;i<n;i++) {
00421 dqm::State_t state=shmBuffer_->dqmState(i);
00422 if(state!=dqm::EMPTY) allEmpty = false;
00423 }
00424 shmBuffer_->unlock();
00425 }
00426 std::cout << "Making sure there are no dqm pending discards " << std::endl;
00427 if(nbPendingSMDqmDiscards_ != 0)
00428 {
00429 LOG4CPLUS_WARN(log_,"FUResourceTable: pending DQM discards not zero: ="
00430 << nbPendingSMDqmDiscards_ << " while cells are all empty. This may cause problems at next start ");
00431
00432 }
00433 shmBuffer_->writeDqmEmptyEvent();
00434 isReadyToShutDown_ = true;
00435
00436 }
00437
00438 return reschedule;
00439 }
00440
00441
00442
00443
00444 UInt_t FUResourceTable::allocateResource()
00445 {
00446 assert(!freeResourceIds_.empty());
00447
00448 lock();
00449 UInt_t fuResourceId=freeResourceIds_.front();
00450 freeResourceIds_.pop();
00451 nbPending_++;
00452 nbAllocated_++;
00453 unlock();
00454
00455 return fuResourceId;
00456 }
00457
00458
00459
00460 bool FUResourceTable::buildResource(MemRef_t* bufRef)
00461 {
00462 bool eventComplete=false;
00463 bool lastMsg=isLastMessageOfEvent(bufRef);
00464 I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME *block=
00465 (I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME*)bufRef->getDataLocation();
00466
00467 UInt_t fuResourceId=(UInt_t)block->fuTransactionId;
00468 UInt_t buResourceId=(UInt_t)block->buResourceId;
00469 FUResource* resource =resources_[fuResourceId];
00470
00471 if (!resource->fatalError()&&!resource->isAllocated()) {
00472 FUShmRawCell* cell=shmBuffer_->rawCellToWrite();
00473 if(cell==0){bufRef->release(); return eventComplete;}
00474 resource->allocate(cell);
00475 timeval now;
00476 gettimeofday(&now,0);
00477
00478 frb_->setRBTimeStamp(((uint64_t)(now.tv_sec) << 32) + (uint64_t)(now.tv_usec));
00479
00480 frb_->setRBEventCount(nbCompleted_);
00481
00482 if (doCrcCheck_>0&&0==nbAllocated_%doCrcCheck_) resource->doCrcCheck(true);
00483 else resource->doCrcCheck(false);
00484 }
00485
00486
00487
00488 if (!resource->fatalError()) {
00489 resource->process(bufRef);
00490 lock();
00491 nbErrors_ +=resource->nbErrors();
00492 nbCrcErrors_+=resource->nbCrcErrors();
00493 unlock();
00494
00495
00496 if (resource->isComplete()) {
00497 lock();
00498 nbCompleted_++;
00499 nbPending_--;
00500 unlock();
00501 if (doDumpEvents_>0&&nbCompleted_%doDumpEvents_==0)
00502 dumpEvent(resource->shmCell());
00503 shmBuffer_->finishWritingRawCell(resource->shmCell());
00504 eventComplete=true;
00505 }
00506
00507 }
00508
00509
00510 if (resource->fatalError()) {
00511 if (lastMsg) {
00512 shmBuffer_->releaseRawCell(resource->shmCell());
00513 resource->release();
00514 lock();
00515 freeResourceIds_.push(fuResourceId);
00516 nbDiscarded_++;
00517 nbLost_++;
00518 nbPending_--;
00519 unlock();
00520 bu_->sendDiscard(buResourceId);
00521 sendAllocate();
00522 }
00523
00524 }
00525
00526 return eventComplete;
00527 }
00528
00529
00530
00531 bool FUResourceTable::discardDataEvent(MemRef_t* bufRef)
00532 {
00533 I2O_FU_DATA_DISCARD_MESSAGE_FRAME *msg;
00534 msg=(I2O_FU_DATA_DISCARD_MESSAGE_FRAME*)bufRef->getDataLocation();
00535 UInt_t recoIndex=msg->rbBufferID;
00536
00537 if (acceptSMDataDiscard_[recoIndex]) {
00538 lock();
00539 nbPendingSMDiscards_--;
00540 unlock();
00541 acceptSMDataDiscard_[recoIndex] = false;
00542
00543 if (!isHalting_) {
00544 shmBuffer_->discardRecoCell(recoIndex);
00545 bufRef->release();
00546 }
00547 }
00548 else {
00549 LOG4CPLUS_ERROR(log_,"Spurious DATA discard by StorageManager, skip!");
00550 }
00551
00552 if (isHalting_) {
00553 bufRef->release();
00554 return false;
00555 }
00556
00557 return true;
00558 }
00559
00560
00561
00562 bool FUResourceTable::discardDqmEvent(MemRef_t* bufRef)
00563 {
00564 I2O_FU_DQM_DISCARD_MESSAGE_FRAME *msg;
00565 msg=(I2O_FU_DQM_DISCARD_MESSAGE_FRAME*)bufRef->getDataLocation();
00566 UInt_t dqmIndex=msg->rbBufferID;
00567 unsigned int ntries = 0;
00568 while(shmBuffer_->dqmState(dqmIndex)!=dqm::SENT){
00569 LOG4CPLUS_WARN(log_,"DQM discard for cell "<< dqmIndex << " which is not yer in SENT state - waiting");
00570 ::usleep(10000);
00571 if(ntries++>10){
00572 LOG4CPLUS_ERROR(log_,"DQM cell " << dqmIndex
00573 << " discard timed out while cell still in state " << shmBuffer_->dqmState(dqmIndex) );
00574 bufRef->release();
00575 return true;
00576 }
00577 }
00578 if (acceptSMDqmDiscard_[dqmIndex]>0) {
00579 acceptSMDqmDiscard_[dqmIndex]--;
00580 if(nbPendingSMDqmDiscards_>0){
00581 nbPendingSMDqmDiscards_--;
00582 }
00583 else {
00584 LOG4CPLUS_WARN(log_,"Spurious??? DQM discard by StorageManager, index " << dqmIndex
00585 << " cell state " << shmBuffer_->dqmState(dqmIndex) << " accept flag " << acceptSMDqmDiscard_[dqmIndex];);
00586 }
00587
00588 if (!isHalting_) {
00589 shmBuffer_->discardDqmCell(dqmIndex);
00590 bufRef->release();
00591 }
00592
00593 }
00594 else {
00595 LOG4CPLUS_ERROR(log_,"Spurious DQM discard for cell " << dqmIndex
00596 << " from StorageManager while cell is not accepting discards");
00597 }
00598
00599 if (isHalting_) {
00600 bufRef->release();
00601 return false;
00602 }
00603
00604 return true;
00605 }
00606
00607
00608
00609 void FUResourceTable::postEndOfLumiSection(MemRef_t* bufRef)
00610 {
00611 I2O_EVM_END_OF_LUMISECTION_MESSAGE_FRAME *msg =
00612 (I2O_EVM_END_OF_LUMISECTION_MESSAGE_FRAME *)bufRef->getDataLocation();
00613
00614
00615
00616 for(unsigned int i = 0; i < nbRawCells_; i++)
00617 {
00618 nbEolPosted_++;
00619 shmBuffer_->writeRawLumiSectionEvent(msg->lumiSection);
00620 }
00621 }
00622
00623
00624
00625 void FUResourceTable::dropEvent()
00626 {
00627 FUShmRawCell* cell=shmBuffer_->rawCellToRead();
00628 UInt_t fuResourceId=cell->fuResourceId();
00629 shmBuffer_->finishReadingRawCell(cell);
00630 shmBuffer_->scheduleRawCellForDiscard(fuResourceId);
00631 }
00632
00633
00634
00635 bool FUResourceTable::handleCrashedEP(UInt_t runNumber,pid_t pid)
00636 {
00637 bool retval = false;
00638 vector<pid_t> pids=cellPrcIds();
00639 UInt_t iRawCell=pids.size();
00640 for (UInt_t i=0;i<pids.size();i++) { if (pid==pids[i]) { iRawCell=i; break; } }
00641
00642 if (iRawCell<pids.size()){
00643 shmBuffer_->writeErrorEventData(runNumber,pid,iRawCell);
00644 retval = true;
00645 }
00646 else
00647 LOG4CPLUS_WARN(log_,"No raw data to send to error stream for process " << pid);
00648 shmBuffer_->removeClientPrcId(pid);
00649 return retval;
00650 }
00651
00652
00653
00654 void FUResourceTable::dumpEvent(FUShmRawCell* cell)
00655 {
00656 ostringstream oss; oss<<"/tmp/evt"<<cell->evtNumber()<<".dump";
00657 ofstream fout(oss.str().c_str());
00658 fout.fill('0');
00659
00660 fout<<"#\n# evt "<<cell->evtNumber()<<"\n#\n"<<endl;
00661 for (unsigned int i=0;i<cell->nFed();i++) {
00662 if (cell->fedSize(i)==0) continue;
00663 fout<<"# fedid "<<i<<endl;
00664 unsigned char* addr=cell->fedAddr(i);
00665 for (unsigned int j=0;j<cell->fedSize(i);j++) {
00666 fout<<setiosflags(ios::right)<<setw(2)<<hex<<(int)(*addr)<<dec;
00667 if ((j+1)%8) fout<<" "; else fout<<endl;
00668 ++addr;
00669 }
00670 fout<<endl;
00671 }
00672 fout.close();
00673 }
00674
00675
00676
00677 void FUResourceTable::stop()
00678 {
00679 isStopping_ = true;
00680 shutDownClients();
00681 }
00682
00683
00684
00685 void FUResourceTable::halt()
00686 {
00687 isHalting_=true;
00688 shutDownClients();
00689 }
00690
00691
00692
00693 void FUResourceTable::shutDownClients()
00694 {
00695 nbClientsToShutDown_ = nbClients();
00696 isReadyToShutDown_ = false;
00697
00698 if (nbClientsToShutDown_==0) {
00699 LOG4CPLUS_INFO(log_,"No clients to shut down. Checking if there are raw cells not assigned to any process yet");
00700 UInt_t n=nbResources();
00701 for (UInt_t i=0;i<n;i++) {
00702 evt::State_t state=shmBuffer_->evtState(i);
00703 if (state!=evt::EMPTY){
00704 LOG4CPLUS_WARN(log_,"Schedule discard at STOP for orphaned event in state "
00705 << state);
00706 shmBuffer_->scheduleRawCellForDiscardServerSide(i);
00707 }
00708 }
00709 shmBuffer_->scheduleRawEmptyCellForDiscard();
00710 }
00711 else {
00712 int checks=0;
00713
00714 while(shmBuffer_->nbRawCellsToWrite()<nbClients() && nbClients()!=0)
00715 {
00716 checks++;
00717 vector<pid_t> prcids=clientPrcIds();
00718 for (UInt_t i=0;i<prcids.size();i++) {
00719 pid_t pid =prcids[i];
00720 int status=kill(pid,0);
00721 if (status!=0) {
00722 LOG4CPLUS_ERROR(log_,"EP prc "<<pid<<" completed with error.");
00723 handleCrashedEP(runNumber_,pid);
00724 }
00725 }
00726
00727 LOG4CPLUS_WARN(log_,"no cell to write stop " << shmBuffer_->nbRawCellsToWrite()
00728 << " nClients " << nbClients());
00729 if(checks>10){
00730 string msg = "No Raw Cell to Write STOP messages";
00731 XCEPT_RAISE(evf::Exception,msg);
00732 }
00733 ::usleep(500000);
00734 }
00735 nbClientsToShutDown_ = nbClients();
00736 if(nbClientsToShutDown_==0){
00737 UInt_t n=nbResources();
00738 for (UInt_t i=0;i<n;i++) {
00739 evt::State_t state=shmBuffer_->evtState(i);
00740 if (state!=evt::EMPTY){
00741 LOG4CPLUS_WARN(log_,"Schedule discard at STOP for orphaned event in state "
00742 << state);
00743 shmBuffer_->setEvtDiscard(i,1);
00744 shmBuffer_->scheduleRawCellForDiscardServerSide(i);
00745 }
00746 }
00747 shmBuffer_->scheduleRawEmptyCellForDiscard();
00748 }
00749 UInt_t n=nbClientsToShutDown_;
00750 for (UInt_t i=0;i<n;++i) shmBuffer_->writeRawEmptyEvent();
00751 }
00752 }
00753
00754
00755
00756 void FUResourceTable::clear()
00757 {
00758 for (UInt_t i=0;i<resources_.size();i++) {
00759 resources_[i]->release();
00760 delete resources_[i];
00761 }
00762 resources_.clear();
00763 while (!freeResourceIds_.empty()) freeResourceIds_.pop();
00764 }
00765
00766
00767
00768 void FUResourceTable::resetCounters()
00769 {
00770 if (0!=shmBuffer_) {
00771 for (UInt_t i=0;i<shmBuffer_->nRecoCells();i++) acceptSMDataDiscard_[i]= false;
00772 for (UInt_t i=0;i<shmBuffer_->nDqmCells();i++) acceptSMDqmDiscard_[i] = 0;
00773 }
00774
00775 nbAllocated_ =nbPending_;
00776 nbCompleted_ =0;
00777 nbSent_ =0;
00778 nbSentError_ =0;
00779 nbSentDqm_ =0;
00780 nbPendingSMDiscards_ =0;
00781 nbPendingSMDqmDiscards_=0;
00782 nbDiscarded_ =0;
00783 nbLost_ =0;
00784 nbEolPosted_ =0;
00785 nbEolDiscarded_ =0;
00786
00787 nbErrors_ =0;
00788 nbCrcErrors_ =0;
00789 nbAllocSent_ =0;
00790
00791 sumOfSquares_ =0;
00792 sumOfSizes_ =0;
00793 isStopping_ =false;
00794 }
00795
00796
00797
00798 UInt_t FUResourceTable::nbClients() const
00799 {
00800 UInt_t result(0);
00801 if (0!=shmBuffer_) result=shmBuffer_->nClients();
00802 return result;
00803 }
00804
00805
00806
00807 vector<pid_t> FUResourceTable::clientPrcIds() const
00808 {
00809 vector<pid_t> result;
00810 if (0!=shmBuffer_) {
00811 UInt_t n=nbClients();
00812 for (UInt_t i=0;i<n;i++) result.push_back(shmBuffer_->clientPrcId(i));
00813 }
00814 return result;
00815 }
00816
00817
00818
00819 string FUResourceTable::clientPrcIdsAsString() const
00820 {
00821 stringstream ss;
00822 if (0!=shmBuffer_) {
00823 UInt_t n=nbClients();
00824 for (UInt_t i=0;i<n;i++) {
00825 if (i>0) ss<<",";
00826 ss<<shmBuffer_->clientPrcId(i);
00827 }
00828 }
00829 return ss.str();
00830 }
00831
00832
00833
00834 vector<string> FUResourceTable::cellStates() const
00835 {
00836 vector<string> result;
00837 if (0!=shmBuffer_) {
00838 UInt_t n=nbResources();
00839 shmBuffer_->lock();
00840 for (UInt_t i=0;i<n;i++) {
00841 evt::State_t state=shmBuffer_->evtState(i);
00842 if (state==evt::EMPTY) result.push_back("EMPTY");
00843 else if (state==evt::STOP) result.push_back("STOP");
00844 else if (state==evt::LUMISECTION)result.push_back("LUMISECTION");
00845 else if (state==evt::USEDLS) result.push_back("USEDLS");
00846 else if (state==evt::RAWWRITING) result.push_back("RAWWRITING");
00847 else if (state==evt::RAWWRITTEN) result.push_back("RAWWRITTEN");
00848 else if (state==evt::RAWREADING) result.push_back("RAWREADING");
00849 else if (state==evt::RAWREAD) result.push_back("RAWREAD");
00850 else if (state==evt::PROCESSING) result.push_back("PROCESSING");
00851 else if (state==evt::PROCESSED) result.push_back("PROCESSED");
00852 else if (state==evt::RECOWRITING)result.push_back("RECOWRITING");
00853 else if (state==evt::RECOWRITTEN)result.push_back("RECOWRITTEN");
00854 else if (state==evt::SENDING) result.push_back("SENDING");
00855 else if (state==evt::SENT) result.push_back("SENT");
00856 else if (state==evt::DISCARDING) result.push_back("DISCARDING");
00857 }
00858 shmBuffer_->unlock();
00859 }
00860 return result;
00861 }
00862
00863 vector<string> FUResourceTable::dqmCellStates() const
00864 {
00865 vector<string> result;
00866 if (0!=shmBuffer_) {
00867 UInt_t n=nbDqmCells_;
00868 shmBuffer_->lock();
00869 for (UInt_t i=0;i<n;i++) {
00870 dqm::State_t state=shmBuffer_->dqmState(i);
00871 if (state==dqm::EMPTY) result.push_back("EMPTY");
00872 else if (state==dqm::WRITING) result.push_back("WRITING");
00873 else if (state==dqm::WRITTEN) result.push_back("WRITTEN");
00874 else if (state==dqm::SENDING) result.push_back("SENDING");
00875 else if (state==dqm::SENT) result.push_back("SENT");
00876 else if (state==dqm::DISCARDING) result.push_back("DISCARDING");
00877 }
00878 shmBuffer_->unlock();
00879 }
00880 return result;
00881 }
00882
00883
00884
00885 vector<UInt_t> FUResourceTable::cellEvtNumbers() const
00886 {
00887 vector<UInt_t> result;
00888 if (0!=shmBuffer_) {
00889 UInt_t n=nbResources();
00890 shmBuffer_->lock();
00891 for (UInt_t i=0;i<n;i++) result.push_back(shmBuffer_->evtNumber(i));
00892 shmBuffer_->unlock();
00893 }
00894 return result;
00895 }
00896
00897
00898
00899 vector<pid_t> FUResourceTable::cellPrcIds() const
00900 {
00901 vector<pid_t> result;
00902 if (0!=shmBuffer_) {
00903 UInt_t n=nbResources();
00904 shmBuffer_->lock();
00905 for (UInt_t i=0;i<n;i++) result.push_back(shmBuffer_->evtPrcId(i));
00906 shmBuffer_->unlock();
00907 }
00908 return result;
00909 }
00910
00911
00912
00913 vector<time_t> FUResourceTable::cellTimeStamps() const
00914 {
00915 vector<time_t> result;
00916 if (0!=shmBuffer_) {
00917 UInt_t n=nbResources();
00918 shmBuffer_->lock();
00919 for (UInt_t i=0;i<n;i++) result.push_back(shmBuffer_->evtTimeStamp(i));
00920 shmBuffer_->unlock();
00921 }
00922 return result;
00923 }
00924
00925
00927
00929
00930
00931 void FUResourceTable::sendAllocate()
00932 {
00933 UInt_t nbFreeSlots = this->nbFreeSlots();
00934 UInt_t nbFreeSlotsMax = resources_.size()/2;
00935 if (nbFreeSlots>nbFreeSlotsMax) {
00936 UIntVec_t fuResourceIds;
00937 for (UInt_t i=0;i<nbFreeSlots;i++)
00938 fuResourceIds.push_back(allocateResource());
00939 bu_->sendAllocate(fuResourceIds);
00940 nbAllocSent_++;
00941 }
00942 }
00943
00944
00945
00946 void FUResourceTable::sendDiscard(UInt_t buResourceId)
00947 {
00948 bu_->sendDiscard(buResourceId);
00949 nbDiscarded_++;
00950 }
00951
00952
00953
00954 void FUResourceTable::sendInitMessage(UInt_t fuResourceId,
00955 UInt_t outModId,
00956 UInt_t fuProcessId,
00957 UInt_t fuGuid,
00958 UChar_t *data,
00959 UInt_t dataSize)
00960 {
00961 if (0==sm_) {
00962 LOG4CPLUS_ERROR(log_,"No StorageManager, DROP INIT MESSAGE!");
00963 }
00964 else {
00965 acceptSMDataDiscard_[fuResourceId] = true;
00966 UInt_t nbBytes=sm_->sendInitMessage(fuResourceId,outModId,fuProcessId,
00967 fuGuid,data,dataSize);
00968 sumOfSquares_+=(uint64_t)nbBytes*(uint64_t)nbBytes;
00969 sumOfSizes_ +=nbBytes;
00970 }
00971 }
00972
00973
00974
00975 void FUResourceTable::sendDataEvent(UInt_t fuResourceId,
00976 UInt_t runNumber,
00977 UInt_t evtNumber,
00978 UInt_t outModId,
00979 UInt_t fuProcessId,
00980 UInt_t fuGuid,
00981 UChar_t *data,
00982 UInt_t dataSize)
00983 {
00984 if (0==sm_) {
00985 LOG4CPLUS_ERROR(log_,"No StorageManager, DROP DATA EVENT!");
00986 }
00987 else {
00988 acceptSMDataDiscard_[fuResourceId] = true;
00989 UInt_t nbBytes=sm_->sendDataEvent(fuResourceId,runNumber,evtNumber,
00990 outModId,fuProcessId,fuGuid,
00991 data,dataSize);
00992 sumOfSquares_+=(uint64_t)nbBytes*(uint64_t)nbBytes;
00993 sumOfSizes_ +=nbBytes;
00994 }
00995 }
00996
00997
00998
00999 void FUResourceTable::sendErrorEvent(UInt_t fuResourceId,
01000 UInt_t runNumber,
01001 UInt_t evtNumber,
01002 UInt_t fuProcessId,
01003 UInt_t fuGuid,
01004 UChar_t *data,
01005 UInt_t dataSize)
01006 {
01007 if (0==sm_) {
01008 LOG4CPLUS_ERROR(log_,"No StorageManager, DROP ERROR EVENT!");
01009 }
01010 else {
01011 acceptSMDataDiscard_[fuResourceId] = true;
01012 UInt_t nbBytes=sm_->sendErrorEvent(fuResourceId,runNumber,evtNumber,
01013 fuProcessId,fuGuid,data,dataSize);
01014 sumOfSquares_+=(uint64_t)nbBytes*(uint64_t)nbBytes;
01015 sumOfSizes_ +=nbBytes;
01016 }
01017
01018
01019
01020
01021
01022
01023
01024
01025
01026
01027
01028
01029
01030
01031
01032
01033
01034
01035
01036
01037
01038
01039
01040 }
01041
01042
01043
01044 void FUResourceTable::sendDqmEvent(UInt_t fuDqmId,
01045 UInt_t runNumber,
01046 UInt_t evtAtUpdate,
01047 UInt_t folderId,
01048 UInt_t fuProcessId,
01049 UInt_t fuGuid,
01050 UChar_t* data,
01051 UInt_t dataSize)
01052 {
01053 if (0==sm_) {
01054 LOG4CPLUS_WARN(log_,"No StorageManager, DROP DQM EVENT.");
01055 }
01056 else {
01057 sm_->sendDqmEvent(fuDqmId,runNumber,evtAtUpdate,folderId,
01058 fuProcessId,fuGuid,data,dataSize);
01059
01060 nbPendingSMDqmDiscards_++;
01061
01062 acceptSMDqmDiscard_[fuDqmId]++;
01063 if(acceptSMDqmDiscard_[fuDqmId]>1)
01064 LOG4CPLUS_WARN(log_,"DQM Cell " << fuDqmId << " being sent more than once for folder "
01065 << folderId << " process " << fuProcessId << " guid " << fuGuid);
01066 nbSentDqm_++;
01067 }
01068 }
01069
01070
01071
01072 bool FUResourceTable::isLastMessageOfEvent(MemRef_t* bufRef)
01073 {
01074 while (0!=bufRef->getNextReference()) {
01075 bufRef=bufRef->getNextReference();
01076 }
01077 I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME *block=
01078 (I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME*)bufRef->getDataLocation();
01079
01080 UInt_t iBlock =block->blockNb;
01081 UInt_t nBlock =block->nbBlocksInSuperFragment;
01082 UInt_t iSuperFrag=block->superFragmentNb;
01083 UInt_t nSuperFrag=block->nbSuperFragmentsInEvent;
01084 return ((iSuperFrag==nSuperFrag-1)&&(iBlock==nBlock-1));
01085 }
01086
01087
01088 void FUResourceTable::injectCRCError()
01089 {
01090 for (UInt_t i=0;i<resources_.size();i++) {
01091 resources_[i]->scheduleCRCError();
01092 }
01093 }
01094 void FUResourceTable::printWorkLoopStatus()
01095 {
01096 std::cout << "Workloop status===============" << std::endl;
01097 std::cout << "==============================" << std::endl;
01098 if(wlSendData_!=0)
01099 std::cout << "SendData -> " << wlSendData_->isActive() << std::endl;
01100 if(wlSendDqm_!=0)
01101 std::cout << "SendDqm -> " << wlSendDqm_->isActive() << std::endl;
01102 if(wlDiscard_!=0)
01103 std::cout << "Discard -> " << wlDiscard_->isActive() << std::endl;
01104 std::cout << "Workloops Active -> " << isActive_ << std::endl;
01105
01106 }
01107
01108
01109 void FUResourceTable::lastResort()
01110 {
01111 std::cout << "lastResort: " << shmBuffer_->nbRawCellsToRead()
01112 << " more rawcells to read " << std::endl;
01113 while(shmBuffer_->nbRawCellsToRead()!=0){
01114 FUShmRawCell* newCell=shmBuffer_->rawCellToRead();
01115 std::cout << "lastResort: " << shmBuffer_->nbRawCellsToRead() << std::endl;
01116 shmBuffer_->scheduleRawCellForDiscardServerSide(newCell->index());
01117 std::cout << "lastResort: schedule raw cell for discard" << std::endl;
01118 }
01119
01120 shmBuffer_->scheduleRawEmptyCellForDiscard();
01121 }