14 #include "toolbox/task/WorkLoopFactory.h"
15 #include "interface/evb/i2oEVBMsgs.h"
16 #include "xcept/tools.h"
46 xdaq::Application*app)
58 , nbDqmCells_(nbDqmCells)
59 , nbRawCells_(nbRawCells)
60 , nbRecoCells_(nbRecoCells)
61 , acceptSMDataDiscard_(0)
62 , acceptSMDqmDiscard_(0)
64 , shutdownTimeout_(timeout)
66 , nbClientsToShutDown_(0)
67 , isReadyToShutDown_(
true)
72 , lock_(toolbox::BSem::FULL)
76 initialize(segmentationMode,
77 nbRawCells,nbRecoCells,nbDqmCells,
78 rawCellSize,recoCellSize,dqmCellSize);
87 wlSendData_->cancel();
88 toolbox::task::getWorkLoopFactory()->removeWorkLoop(
"SendData",
"waiting");
92 toolbox::task::getWorkLoopFactory()->removeWorkLoop(
"SendDqm",
"waiting");
96 toolbox::task::getWorkLoopFactory()->removeWorkLoop(
"Discard",
"waiting");
100 LOG4CPLUS_INFO(log_,
"SHARED MEMORY SUCCESSFULLY RELEASED.");
101 if (0!=acceptSMDataDiscard_)
delete [] acceptSMDataDiscard_;
102 if (0!= acceptSMDqmDiscard_)
delete [] acceptSMDqmDiscard_;
123 nbRawCells,nbRecoCells,nbDqmCells,
124 rawCellSize,recoCellSize,dqmCellSize);
126 string msg =
"CREATION OF SHARED MEMORY SEGMENT FAILED!";
127 LOG4CPLUS_FATAL(log_,msg);
132 resources_.push_back(
new FUResource(
i,log_,frb_,app_));
133 freeResourceIds_.push(
i);
136 acceptSMDataDiscard_ =
new bool[nbRecoCells];
137 acceptSMDqmDiscard_ =
new int[nbDqmCells];
148 toolbox::task::getWorkLoopFactory()->getWorkLoop(
"SendData",
"waiting");
149 if (!wlSendData_->isActive()) wlSendData_->activate();
151 wlSendData_->submit(asSendData_);
154 string msg =
"Failed to start workloop 'SendData'.";
163 bool reschedule=
true;
168 LOG4CPLUS_INFO(log_,
"Don't reschedule sendData workloop.");
170 shmBuffer_->finishReadingRecoCell(cell);
171 shmBuffer_->discardRecoCell(cellIndex);
174 else if (isHalting_) {
175 LOG4CPLUS_INFO(log_,
"sendData: isHalting, discard recoCell.");
177 shmBuffer_->finishReadingRecoCell(cell);
178 shmBuffer_->discardRecoCell(cellIndex);
182 if (cell->
type()==0) {
189 shmBuffer_->finishReadingRecoCell(cell);
192 nbPendingSMDiscards_++;
195 sendInitMessage(cellIndex,cellOutModId,cellFUProcId,cellFUGuid,
196 cellPayloadAddr,cellEventSize);
198 else if (cell->
type()==1) {
208 shmBuffer_->finishReadingRecoCell(cell);
211 nbPendingSMDiscards_++;
212 resources_[cellRawIndex]->incNbSent();
213 if (resources_[cellRawIndex]->nbSent()==1) nbSent_++;
216 sendDataEvent(cellIndex,cellRunNumber,cellEvtNumber,cellOutModId,
217 cellFUProcId,cellFUGuid,cellPayloadAddr,cellEventSize);
219 else if (cell->
type()==2) {
228 shmBuffer_->finishReadingRecoCell(cell);
231 nbPendingSMDiscards_++;
232 resources_[cellRawIndex]->incNbSent();
233 if (resources_[cellRawIndex]->nbSent()==1) { nbSent_++; nbSentError_++; }
236 sendErrorEvent(cellIndex,
runNumber_,cellEvtNumber,
237 cellFUProcId,cellFUGuid,cellPayloadAddr,cellEventSize);
240 string errmsg=
"Unknown RecoCell type (neither INIT/DATA/ERROR).";
245 LOG4CPLUS_FATAL(log_,
"Failed to send EVENT DATA to StorageManager: "
246 <<xcept::stdformat_exception_history(e));
259 wlSendDqm_=toolbox::task::getWorkLoopFactory()->getWorkLoop(
"SendDqm",
"waiting");
260 if (!wlSendDqm_->isActive()) wlSendDqm_->activate();
262 wlSendDqm_->submit(asSendDqm_);
265 string msg =
"Failed to start workloop 'SendDqm'.";
274 bool reschedule=
true;
280 LOG4CPLUS_WARN(log_,
"Don't reschedule sendDqm workloop.");
281 std::cout <<
"shut down dqm workloop " << std::endl;
283 shmBuffer_->finishReadingDqmCell(cell);
284 shmBuffer_->discardDqmCell(cellIndex);
287 else if (isHalting_) {
289 shmBuffer_->finishReadingDqmCell(cell);
290 shmBuffer_->discardDqmCell(cellIndex);
302 sendDqmEvent(cellIndex,cellRunNumber,cellEvtAtUpdate,cellFolderId,
303 cellFUProcId,cellFUGuid,cellPayloadAddr,cellEventSize);
304 shmBuffer_->finishReadingDqmCell(cell);
307 LOG4CPLUS_FATAL(log_,
"Failed to send DQM DATA to StorageManager: "
308 <<xcept::stdformat_exception_history(e));
321 LOG4CPLUS_INFO(log_,
"Start 'discard' workloop.");
322 wlDiscard_=toolbox::task::getWorkLoopFactory()->getWorkLoop(
"Discard",
"waiting");
323 if (!wlDiscard_->isActive()) wlDiscard_->activate();
325 wlDiscard_->submit(asDiscard_);
329 string msg =
"Failed to start workloop 'Discard'.";
332 isReadyToShutDown_=
false;
342 bool reschedule =
true;
353 LOG4CPLUS_INFO(log_,
"nbClientsToShutDown = "<<nbClientsToShutDown_);
354 if (nbClientsToShutDown_>0) --nbClientsToShutDown_;
355 if (nbClientsToShutDown_==0) {
356 LOG4CPLUS_INFO(log_,
"Don't reschedule discard-workloop.");
362 shmBuffer_->discardRawCell(cell);
364 if (!shutDown && !isLumi) {
365 resources_[fuResourceId]->release();
367 freeResourceIds_.push(fuResourceId);
368 assert(freeResourceIds_.size()<=resources_.size());
372 sendDiscard(buResourceId);
373 if(!isStopping_)sendAllocate();
378 std::cout <<
" entered shutdown cycle " << std::endl;
379 shmBuffer_->writeRecoEmptyEvent();
382 std::cout <<
" shutdown cycle " <<shmBuffer_->nClients() <<
" "
384 if (shmBuffer_->nClients()==0&&
391 std::cout <<
" shutdown cycle attempt " << count << std::endl;
392 LOG4CPLUS_DEBUG(log_,
"FUResourceTable: Wait for all clients to detach,"
393 <<
" nClients="<<shmBuffer_->nClients()
396 ::usleep(shutdownTimeout_);
397 if(count*shutdownTimeout_ > 10000000)
398 LOG4CPLUS_WARN(log_,
"FUResourceTable:LONG Wait (>10s) for all clients to detach,"
399 <<
" nClients="<<shmBuffer_->nClients()
405 bool allEmpty =
false;
406 std::cout <<
"Checking if all dqm cells are empty " << std::endl;
415 shmBuffer_->unlock();
417 std::cout <<
"Making sure there are no dqm pending discards " << std::endl;
418 if(nbPendingSMDqmDiscards_ != 0)
420 LOG4CPLUS_WARN(log_,
"FUResourceTable: pending DQM discards not zero: ="
421 << nbPendingSMDqmDiscards_ <<
" while cells are all empty. This may cause problems at next start ");
424 shmBuffer_->writeDqmEmptyEvent();
425 isReadyToShutDown_ =
true;
437 assert(!freeResourceIds_.empty());
440 UInt_t fuResourceId=freeResourceIds_.front();
441 freeResourceIds_.pop();
453 bool eventComplete=
false;
455 I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME *
block=
456 (I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME*)bufRef->getDataLocation();
460 FUResource* resource =resources_[fuResourceId];
467 gettimeofday(&now,0);
469 frb_->setRBTimeStamp(((
uint64_t)(now.tv_sec) << 32) + (
uint64_t)(now.tv_usec));
471 frb_->setRBEventCount(nbCompleted_);
473 if (doCrcCheck_>0&&0==nbAllocated_%doCrcCheck_) resource->
doCrcCheck(
true);
492 if (doDumpEvents_>0&&nbCompleted_%doDumpEvents_==0)
494 shmBuffer_->finishWritingRawCell(resource->
shmCell());
502 bool lastMsg=isLastMessageOfEvent(bufRef);
504 shmBuffer_->releaseRawCell(resource->
shmCell());
507 freeResourceIds_.push(fuResourceId);
512 bu_->sendDiscard(buResourceId);
518 return eventComplete;
529 if (acceptSMDataDiscard_[recoIndex]) {
531 nbPendingSMDiscards_--;
533 acceptSMDataDiscard_[recoIndex] =
false;
536 shmBuffer_->discardRecoCell(recoIndex);
541 LOG4CPLUS_ERROR(log_,
"Spurious DATA discard by StorageManager, skip!");
559 unsigned int ntries = 0;
560 while(shmBuffer_->dqmState(dqmIndex)!=
dqm::SENT){
561 LOG4CPLUS_WARN(log_,
"DQM discard for cell "<< dqmIndex <<
" which is not yer in SENT state - waiting");
564 LOG4CPLUS_ERROR(log_,
"DQM cell " << dqmIndex
565 <<
" discard timed out while cell still in state " << shmBuffer_->dqmState(dqmIndex) );
570 if (acceptSMDqmDiscard_[dqmIndex]>0) {
571 acceptSMDqmDiscard_[dqmIndex]--;
572 if(nbPendingSMDqmDiscards_>0){
573 nbPendingSMDqmDiscards_--;
576 LOG4CPLUS_WARN(log_,
"Spurious??? DQM discard by StorageManager, index " << dqmIndex
577 <<
" cell state " << shmBuffer_->dqmState(dqmIndex) <<
" accept flag " << acceptSMDqmDiscard_[dqmIndex];);
581 shmBuffer_->discardDqmCell(dqmIndex);
587 LOG4CPLUS_ERROR(log_,
"Spurious DQM discard for cell " << dqmIndex
588 <<
" from StorageManager while cell is not accepting discards");
603 I2O_EVM_END_OF_LUMISECTION_MESSAGE_FRAME *
msg =
604 (I2O_EVM_END_OF_LUMISECTION_MESSAGE_FRAME *)bufRef->getDataLocation();
608 for(
unsigned int i = 0;
i < nbRawCells_;
i++)
609 shmBuffer_->writeRawLumiSectionEvent(msg->lumiSection);
618 shmBuffer_->finishReadingRawCell(cell);
619 shmBuffer_->scheduleRawCellForDiscard(fuResourceId);
627 vector<pid_t> pids=cellPrcIds();
628 UInt_t iRawCell=pids.size();
629 for (
UInt_t i=0;
i<pids.size();
i++) {
if (pid==pids[
i]) { iRawCell=
i;
break; } }
631 if (iRawCell<pids.size()){
632 shmBuffer_->writeErrorEventData(runNumber,pid,iRawCell);
636 LOG4CPLUS_WARN(log_,
"No raw data to send to error stream for process " << pid);
637 shmBuffer_->removeClientPrcId(pid);
645 ostringstream oss; oss<<
"/tmp/evt"<<cell->
evtNumber()<<
".dump";
646 ofstream
fout(oss.str().c_str());
650 for (
unsigned int i=0;
i<cell->
nFed();
i++) {
652 fout<<
"# fedid "<<
i<<endl;
653 unsigned char* addr=cell->
fedAddr(
i);
655 fout<<setiosflags(ios::right)<<setw(2)<<hex<<(int)(*addr)<<dec;
684 nbClientsToShutDown_ = nbClients();
685 isReadyToShutDown_ =
false;
687 if (nbClientsToShutDown_==0) {
688 LOG4CPLUS_INFO(log_,
"No clients to shut down. Checking if there are raw cells not assigned to any process yet");
693 LOG4CPLUS_WARN(log_,
"Schedule discard at STOP for orphaned event in state "
695 shmBuffer_->scheduleRawCellForDiscardServerSide(
i);
698 shmBuffer_->scheduleRawEmptyCellForDiscard();
702 for (
UInt_t i=0;
i<
n;++
i) shmBuffer_->writeRawEmptyEvent();
710 for (
UInt_t i=0;
i<resources_.size();
i++) {
711 resources_[
i]->release();
712 delete resources_[
i];
715 while (!freeResourceIds_.empty()) freeResourceIds_.pop();
723 for (
UInt_t i=0;
i<shmBuffer_->nRecoCells();
i++) acceptSMDataDiscard_[
i]=
false;
724 for (
UInt_t i=0;
i<shmBuffer_->nDqmCells();
i++) acceptSMDqmDiscard_[
i] = 0;
727 nbAllocated_ =nbPending_;
732 nbPendingSMDiscards_ =0;
733 nbPendingSMDqmDiscards_=0;
751 if (0!=shmBuffer_) result=shmBuffer_->nClients();
762 for (
UInt_t i=0;
i<
n;
i++) result.push_back(shmBuffer_->clientPrcId(
i));
776 ss<<shmBuffer_->clientPrcId(
i);
792 if (state==
evt::EMPTY) result.push_back(
"EMPTY");
793 else if (state==
evt::STOP) result.push_back(
"STOP");
797 else if (state==
evt::RAWREAD) result.push_back(
"RAWREAD");
802 else if (state==
evt::SENDING) result.push_back(
"SENDING");
803 else if (state==
evt::SENT) result.push_back(
"SENT");
806 shmBuffer_->unlock();
819 if (state==
dqm::EMPTY) result.push_back(
"EMPTY");
820 else if (state==
dqm::WRITING) result.push_back(
"WRITING");
821 else if (state==
dqm::WRITTEN) result.push_back(
"WRITTEN");
822 else if (state==
dqm::SENDING) result.push_back(
"SENDING");
823 else if (state==
dqm::SENT) result.push_back(
"SENT");
826 shmBuffer_->unlock();
839 for (
UInt_t i=0;
i<
n;
i++) result.push_back(shmBuffer_->evtNumber(
i));
840 shmBuffer_->unlock();
853 for (
UInt_t i=0;
i<
n;
i++) result.push_back(shmBuffer_->evtPrcId(
i));
854 shmBuffer_->unlock();
867 for (
UInt_t i=0;
i<
n;
i++) result.push_back(shmBuffer_->evtTimeStamp(
i));
868 shmBuffer_->unlock();
881 UInt_t nbFreeSlots = this->nbFreeSlots();
882 UInt_t nbFreeSlotsMax = resources_.size()/2;
883 if (nbFreeSlots>nbFreeSlotsMax) {
886 fuResourceIds.push_back(allocateResource());
887 bu_->sendAllocate(fuResourceIds);
896 bu_->sendDiscard(buResourceId);
910 LOG4CPLUS_ERROR(log_,
"No StorageManager, DROP INIT MESSAGE!");
913 acceptSMDataDiscard_[fuResourceId] =
true;
914 UInt_t nbBytes=sm_->sendInitMessage(fuResourceId,outModId,fuProcessId,
915 fuGuid,data,dataSize);
917 sumOfSizes_ +=nbBytes;
933 LOG4CPLUS_ERROR(log_,
"No StorageManager, DROP DATA EVENT!");
936 acceptSMDataDiscard_[fuResourceId] =
true;
937 UInt_t nbBytes=sm_->sendDataEvent(fuResourceId,runNumber,evtNumber,
938 outModId,fuProcessId,fuGuid,
941 sumOfSizes_ +=nbBytes;
956 LOG4CPLUS_ERROR(log_,
"No StorageManager, DROP ERROR EVENT!");
959 acceptSMDataDiscard_[fuResourceId] =
true;
960 UInt_t nbBytes=sm_->sendErrorEvent(fuResourceId,runNumber,evtNumber,
961 fuProcessId,fuGuid,data,dataSize);
963 sumOfSizes_ +=nbBytes;
1002 LOG4CPLUS_WARN(log_,
"No StorageManager, DROP DQM EVENT.");
1005 sm_->sendDqmEvent(fuDqmId,runNumber,evtAtUpdate,folderId,
1006 fuProcessId,fuGuid,data,dataSize);
1008 nbPendingSMDqmDiscards_++;
1010 acceptSMDqmDiscard_[fuDqmId]++;
1011 if(acceptSMDqmDiscard_[fuDqmId]>1)
1012 LOG4CPLUS_WARN(log_,
"DQM Cell " << fuDqmId <<
" being sent more than once for folder "
1013 << folderId <<
" process " << fuProcessId <<
" guid " << fuGuid);
1022 while (0!=bufRef->getNextReference()) bufRef=bufRef->getNextReference();
1024 I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME *
block=
1025 (I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME*)bufRef->getDataLocation();
1027 UInt_t iBlock =block->blockNb;
1028 UInt_t nBlock =block->nbBlocksInSuperFragment;
1029 UInt_t iSuperFrag=block->superFragmentNb;
1030 UInt_t nSuperFrag=block->nbSuperFragmentsInEvent;
1032 return ((iSuperFrag==nSuperFrag-1)&&(iBlock==nBlock-1));
1038 for (
UInt_t i=0;
i<resources_.size();
i++) {
1039 resources_[
i]->scheduleCRCError();
1044 std::cout <<
"Workloop status===============" << std::endl;
1045 std::cout <<
"==============================" << std::endl;
1047 std::cout <<
"SendData -> " << wlSendData_->isActive() << std::endl;
1049 std::cout <<
"SendDqm -> " << wlSendDqm_->isActive() << std::endl;
1051 std::cout <<
"Discard -> " << wlDiscard_->isActive() << std::endl;
1052 std::cout <<
"Workloops Active -> " << isActive_ << std::endl;
1059 std::cout <<
"lastResort: " << shmBuffer_->nbRawCellsToRead()
1060 <<
" more rawcells to read " << std::endl;
1061 while(shmBuffer_->nbRawCellsToRead()!=0){
1063 std::cout <<
"lastResort: " << shmBuffer_->nbRawCellsToRead() << std::endl;
1064 shmBuffer_->scheduleRawEmptyCellForDiscardServerSide(newCell);
1065 std::cout <<
"lastResort: schedule raw cell for discard" << std::endl;
unsigned int index() const
static const char runNumber_[]
unsigned int fuGuid() const
bool discardDataEvent(MemRef_t *bufRef)
static int shm_nattch(int shmid)
void process(MemRef_t *bufRef)
std::vector< UInt_t > UIntVec_t
std::vector< UInt_t > cellEvtNumbers() const
unsigned int type() const
FUResourceTable(bool segmentationMode, UInt_t nbRawCells, UInt_t nbRecoCells, UInt_t nbDqmCells, UInt_t rawCellSize, UInt_t recoCellSize, UInt_t dqmCellSize, BUProxy *bu, SMProxy *sm, log4cplus::Logger logger, unsigned int, EvffedFillerRB *frb, xdaq::Application *)
unsigned int index() const
unsigned int runNumber() const
std::vector< std::string > cellStates() const
unsigned int fuGuid() const
unsigned int folderId() const
bool handleCrashedEP(UInt_t runNumber, pid_t pid)
UInt_t nbErrors(bool reset=true)
unsigned int eventSize() const
toolbox::mem::Reference MemRef_t
unsigned int evtAtUpdate() const
void sendErrorEvent(UInt_t fuResourceId, UInt_t runNumber, UInt_t evtNumber, UInt_t fuProcessId, UInt_t fuGuid, UChar_t *data, UInt_t dataSize)
bool buildResource(MemRef_t *bufRef)
bool sendData(toolbox::task::WorkLoop *workLoop)
unsigned int fuProcessId() const
static bool releaseSharedMemory()
bool sendDqm(toolbox::task::WorkLoop *workLoop)
unsigned int evtNumber() const
UInt_t allocateResource()
void sendDiscard(UInt_t buResourceId)
unsigned char * payloadAddr() const
void sendDataEvent(UInt_t fuResourceId, UInt_t runNumber, UInt_t evtNumber, UInt_t outModId, UInt_t fuProcessId, UInt_t fuGuid, UChar_t *data, UInt_t dataSize)
unsigned int evtNumber() const
unsigned int rawCellIndex() const
unsigned int fuProcessId() const
unsigned int runNumber() const
bool discard(toolbox::task::WorkLoop *workLoop)
std::vector< time_t > cellTimeStamps() const
std::vector< pid_t > clientPrcIds() const
void startSendDqmWorkLoop()
void startSendDataWorkLoop()
block
Formating index page's pieces.
unsigned char * fedAddr(unsigned int i) const
static FUShmBuffer * createShmBuffer(bool semgmentationMode, unsigned int nRawCells, unsigned int nRecoCells, unsigned int nDqmCells, unsigned int rawCellSize=0x400000, unsigned int recoCellSize=0x400000, unsigned int dqmCellSize=0x400000)
bool isLastMessageOfEvent(MemRef_t *bufRef)
unsigned char * payloadAddr() const
evf::FUShmRawCell * shmCell()
std::string clientPrcIdsAsString() const
unsigned long long uint64_t
void printWorkLoopStatus()
unsigned int eventSize() const
void doCrcCheck(bool doCrcCheck)
unsigned int index() const
std::vector< pid_t > cellPrcIds() const
unsigned int nFed() const
unsigned int outModId() const
void dumpEvent(evf::FUShmRawCell *cell)
unsigned int fuResourceId() const
UInt_t nbCrcErrors(bool reset=true)
void startDiscardWorkLoop()
unsigned int fedSize(unsigned int i) const
void postEndOfLumiSection(MemRef_t *bufRef)
void sendDqmEvent(UInt_t fuDqmId, UInt_t runNumber, UInt_t evtAtUpdate, UInt_t folderId, UInt_t fuProcessId, UInt_t fuGuid, UChar_t *data, UInt_t dataSize)
virtual ~FUResourceTable()
std::vector< std::string > dqmCellStates() const
void allocate(FUShmRawCell *shmCell)
bool discardDqmEvent(MemRef_t *bufRef)
unsigned int buResourceId() const
void sendInitMessage(UInt_t fuResourceId, UInt_t outModId, UInt_t fuProcessId, UInt_t fuGuid, UChar_t *data, UInt_t dataSize)
void initialize(bool segmentationMode, UInt_t nbRawCells, UInt_t nbRecoCells, UInt_t nbDqmCells, UInt_t rawCellSize, UInt_t recoCellSize, UInt_t dqmCellSize)