13 #include "interface/evb/i2oEVBMsgs.h"
14 #include "xcept/tools.h"
16 #include <sys/types.h>
38 IPCMethod(segmentationMode, nbRawCells, nbRecoCells, nbDqmCells,
39 rawCellSize, recoCellSize, dqmCellSize, freeResReq, bu, sm,
40 logger, timeout, frb, app), shmBuffer_(0)
43 initialize(segmentationMode, nbRawCells, nbRecoCells, nbDqmCells,
44 rawCellSize, recoCellSize, dqmCellSize);
53 LOG4CPLUS_INFO(log_,
"SHARED MEMORY SUCCESSFULLY RELEASED.");
54 if (0 != acceptSMDataDiscard_)
55 delete[] acceptSMDataDiscard_;
56 if (0 != acceptSMDqmDiscard_)
57 delete[] acceptSMDqmDiscard_;
71 nbRecoCells, nbDqmCells, rawCellSize, recoCellSize, dqmCellSize);
72 if (0 == shmBuffer_) {
73 string msg =
"CREATION OF SHARED MEMORY SEGMENT FAILED!";
74 LOG4CPLUS_FATAL(log_, msg);
78 for (
UInt_t i = 0;
i < nbRawCells_;
i++) {
81 resources_.push_back(newResource);
82 freeResourceIds_.push(
i);
85 acceptSMDataDiscard_ =
new bool[nbRecoCells];
86 acceptSMDqmDiscard_ =
new int[nbDqmCells];
94 bool reschedule =
true;
97 cell = shmBuffer_->recoCellToRead();
99 rethrowShmBufferException(e,
"FUResourceTable:sendData:recoCellToRead");
103 LOG4CPLUS_INFO(log_,
"Don't reschedule sendData workloop.");
106 shmBuffer_->finishReadingRecoCell(cell);
107 shmBuffer_->discardRecoCell(cellIndex);
109 rethrowShmBufferException(e,
110 "FUResourceTable:sendData:finishReadingRecoCell/discardRecoCell");
112 shutdownStatus_|=1<<7;
116 if (cell->
type() == 0) {
125 shmBuffer_->finishReadingRecoCell(cell);
127 rethrowShmBufferException(e,
128 "FUResourceTable:sendData:finishReadingRecoCell");
132 nbPendingSMDiscards_++;
135 sendInitMessage(cellIndex, cellOutModId, cellFUProcId,
136 cellFUGuid, cellPayloadAddr, cellEventSize,
138 }
else if (cell->
type() == 1) {
149 shmBuffer_->finishReadingRecoCell(cell);
151 rethrowShmBufferException(e,
152 "FUResourceTable:sendData:finishReadingRecoCell");
156 nbPendingSMDiscards_++;
157 resources_[cellRawIndex]->incNbSent();
158 if (resources_[cellRawIndex]->nbSent() == 1)
162 sendDataEvent(cellIndex, cellRunNumber, cellEvtNumber,
163 cellOutModId, cellFUProcId, cellFUGuid,
164 cellPayloadAddr, cellEventSize);
165 }
else if (cell->
type() == 2) {
175 shmBuffer_->finishReadingRecoCell(cell);
177 rethrowShmBufferException(e,
178 "FUResourceTable:sendData:recoCellToRead");
182 nbPendingSMDiscards_++;
183 resources_[cellRawIndex]->incNbSent();
184 if (resources_[cellRawIndex]->nbSent() == 1) {
190 sendErrorEvent(cellIndex,
runNumber_, cellEvtNumber,
191 cellFUProcId, cellFUGuid, cellPayloadAddr,
195 "Unknown RecoCell type (neither INIT/DATA/ERROR).";
201 "Failed to send EVENT DATA to StorageManager: "
202 << xcept::stdformat_exception_history(e));
207 sDataActive_=reschedule;
213 bool reschedule =
true;
216 cell = shmBuffer_->recoCellToRead();
218 rethrowShmBufferException(e,
219 "FUResourceTable:sendDataWhileHalting:recoCellToRead");
223 LOG4CPLUS_INFO(log_,
"Don't reschedule sendData workloop.");
226 shmBuffer_->finishReadingRecoCell(cell);
227 shmBuffer_->discardRecoCell(cellIndex);
229 rethrowShmBufferException(e,
230 "FUResourceTable:sendDataWhileHalting:finishReadingRecoCell/discardRecoCell");
232 shutdownStatus_|=1<<8;
235 LOG4CPLUS_INFO(log_,
"sendData: isHalting, discard recoCell.");
238 shmBuffer_->finishReadingRecoCell(cell);
239 shmBuffer_->discardRecoCell(cellIndex);
241 rethrowShmBufferException(e,
242 "FUResourceTable:sendDataWhileHalting:finishReadingRecoCell/discardRecoCell");
246 sDataActive_=reschedule;
252 bool reschedule =
true;
257 cell = shmBuffer_->dqmCellToRead();
258 state = shmBuffer_->dqmState(cell->
index());
260 rethrowShmBufferException(e,
261 "FUResourceTable:sendDqm:dqmCellToRead/dqmState");
265 LOG4CPLUS_INFO(log_,
"Don't reschedule sendDqm workloop.");
266 std::cout <<
"shut down dqm workloop " << std::endl;
269 shmBuffer_->finishReadingDqmCell(cell);
270 shmBuffer_->discardDqmCell(cellIndex);
272 rethrowShmBufferException(e,
273 "FUResourceTable:sendDqm:finishReadingDqmCell/discardDqmCell");
275 shutdownStatus_|=1<<9;
287 sendDqmEvent(cellIndex, cellRunNumber, cellEvtAtUpdate,
288 cellFolderId, cellFUProcId, cellFUGuid, cellPayloadAddr,
291 shmBuffer_->finishReadingDqmCell(cell);
293 rethrowShmBufferException(e,
294 "FUResourceTable:sendDqm:finishReadingDqmCell");
299 "Failed to send DQM DATA to StorageManager: "
300 << xcept::stdformat_exception_history(e));
305 sDqmActive_=reschedule;
311 bool reschedule =
true;
316 cell = shmBuffer_->dqmCellToRead();
317 state = shmBuffer_->dqmState(cell->
index());
319 rethrowShmBufferException(e,
320 "FUResourceTable:sendDqmWhileHalting:dqmCellToRead/dqmState");
324 LOG4CPLUS_INFO(log_,
"Don't reschedule sendDqm workloop.");
325 std::cout <<
"shut down dqm workloop " << std::endl;
328 shmBuffer_->finishReadingDqmCell(cell);
329 shmBuffer_->discardDqmCell(cellIndex);
331 rethrowShmBufferException(e,
332 "FUResourceTable:sendDqmWhileHalting:finishReadingDqmCell/discardDqmCell");
334 shutdownStatus_|=1<<10;
339 shmBuffer_->finishReadingDqmCell(cell);
340 shmBuffer_->discardDqmCell(cellIndex);
342 rethrowShmBufferException(e,
343 "FUResourceTable:sendDqmWhileHalting:finishReadingDqmCell/discardDqmCell");
347 sDqmActive_=reschedule;
355 std::cout <<
" entered shutdown cycle " << std::endl;
356 shutdownStatus_|=1<<11;
358 shmBuffer_->writeRecoEmptyEvent();
360 rethrowShmBufferException(e,
361 "FUResourceTable:discardNoReschedule:writeRecoEmptyEvent");
365 while (count < 100) {
366 std::cout <<
" shutdown cycle " << shmBuffer_->nClients() <<
" "
369 shmBuffer_->shmid()) == 1) {
370 shutdownStatus_|=1<<12;
375 std::cout <<
" shutdown cycle attempt " << count << std::endl;
378 "FUResourceTable: Wait for all clients to detach,"
379 <<
" nClients=" << shmBuffer_->nClients()
381 shmBuffer_->shmid()) <<
" (" << count <<
")");
382 ::usleep( shutdownTimeout_);
383 if (count * shutdownTimeout_ > 10000000)
386 "FUResourceTable:LONG Wait (>10s) for all clients to detach,"
387 <<
" nClients=" << shmBuffer_->nClients()
389 shmBuffer_->shmid()) <<
" (" << count <<
")");
394 bool allEmpty =
false;
395 std::cout <<
"Checking if all dqm cells are empty " << std::endl;
404 state = shmBuffer_->dqmState(
i);
406 rethrowShmBufferException(e,
407 "FUResourceTable:discardNoReschedule:dqmState");
412 shmBuffer_->unlock();
414 shutdownStatus_|=1<<13;
416 std::cout <<
"Number of pending discards before declaring ready to shut down: " << nbPendingSMDqmDiscards_ << std::endl;
417 if (nbPendingSMDqmDiscards_ != 0) {
420 "FUResourceTable: pending DQM discards not zero: ="
421 << nbPendingSMDqmDiscards_
422 <<
" while cells are all empty. This may cause problems at next start ");
427 shmBuffer_->writeDqmEmptyEvent();
429 rethrowShmBufferException(e,
430 "FUResourceTable:discardNoReschedule:writeDqmEmptyEvent");
433 isReadyToShutDown_ =
true;
443 cell = shmBuffer_->rawCellToDiscard();
444 state = shmBuffer_->evtState(cell->
index());
446 rethrowShmBufferException(e,
447 "FUResourceTable:discard:rawCellToRead/evtState");
450 bool reschedule =
true;
457 LOG4CPLUS_ERROR(log_,
"WARNING! ATTEMPTING TO DISCARD EMPTY CELL!!!");
462 LOG4CPLUS_INFO(log_,
"nbClientsToShutDown = " << nbClientsToShutDown_);
463 if (nbClientsToShutDown_ > 0)
464 --nbClientsToShutDown_;
465 if (nbClientsToShutDown_ == 0) {
466 LOG4CPLUS_INFO(log_,
"Don't reschedule discard-workloop.");
473 shmBuffer_->discardRawCell(cell);
475 rethrowShmBufferException(e,
"FUResourceTable:discard:discardRawCell");
481 if (!shutDown && !isLumi) {
482 if (fuResourceId >= nbResources()) {
485 "cell " << cell->
index() <<
" in state " << state
486 <<
" scheduled for discard has no associated FU resource ");
488 resources_[fuResourceId]->release(
true);
490 freeResourceIds_.push(fuResourceId);
491 assert(freeResourceIds_.size() <= resources_.size());
494 sendDiscard(buResourceId);
500 discardNoReschedule();
512 cell = shmBuffer_->rawCellToDiscard();
513 state = shmBuffer_->evtState(cell->
index());
515 rethrowShmBufferException(e,
516 "FUResourceTable:discardWhileHalting:rawCellToRead/evtState");
519 bool reschedule =
true;
526 LOG4CPLUS_ERROR(log_,
"WARNING! ATTEMPTING TO DISCARD EMPTY CELL!!!");
531 LOG4CPLUS_INFO(log_,
"nbClientsToShutDown = " << nbClientsToShutDown_);
532 if (nbClientsToShutDown_ > 0)
533 --nbClientsToShutDown_;
534 if (nbClientsToShutDown_ == 0) {
535 LOG4CPLUS_INFO(log_,
"Don't reschedule discard-workloop.");
542 shmBuffer_->discardRawCell(cell);
544 rethrowShmBufferException(e,
545 "FUResourceTable:discardWhileHalting:discardRawCell");
551 if (!shutDown && !isLumi) {
552 if (fuResourceId >= nbResources()) {
555 "cell " << cell->
index() <<
" in state " << state
556 <<
" scheduled for discard has no associated FU resource ");
558 resources_[fuResourceId]->release(
true);
560 freeResourceIds_.push(fuResourceId);
561 assert(freeResourceIds_.size() <= resources_.size());
569 sendDiscard(buResourceId);
574 discardNoReschedule();
582 bool eventComplete =
false;
584 bool lastMsg = isLastMessageOfEvent(bufRef);
585 I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME *
block =
586 (I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME*) bufRef->getDataLocation();
591 if ((
int) block->fuTransactionId < 0 || fuResourceId >= nbRawCells_
592 || (int) block->buResourceId < 0) {
593 stringstream failureStr;
594 failureStr <<
"Received TAKE message with invalid bu/fu resource id:"
595 <<
" fuResourceId: " << fuResourceId <<
" buResourceId: "
597 LOG4CPLUS_ERROR(log_, failureStr.str());
600 FUResource* resource = resources_[fuResourceId];
606 cell = shmBuffer_->rawCellToWrite();
608 rethrowShmBufferException(e,
609 "FUResourceTable:buildResource:rawCellToWrite");
613 return eventComplete;
617 gettimeofday(&now, 0);
619 frb_->setRBTimeStamp(
622 frb_->setRBEventCount(nbCompleted_);
624 if (doCrcCheck_ > 0 && 0 == nbAllocated_ % doCrcCheck_)
631 std::cout <<
"Received frame for resource " << buResourceId << std::endl;
636 std::cout <<
"No fatal error for " << buResourceId <<
", keep building..."<< std::endl;
644 std::cout <<
"Checking if resource is complete " << buResourceId << std::endl;
649 std::cout <<
"@@@@RESOURCE is COMPLETE " << buResourceId << std::endl;
655 if (doDumpEvents_ > 0 && nbCompleted_ % doDumpEvents_ == 0)
658 shmBuffer_->finishWritingRawCell(resource->
shmCell());
660 rethrowShmBufferException(e,
661 "FUResourceTable:buildResource:finishWritingRawCell");
663 eventComplete =
true;
671 shmBuffer_->releaseRawCell(resource->
shmCell());
673 rethrowShmBufferException(e,
674 "FUResourceTable:buildResource:releaseRawCell");
678 freeResourceIds_.push(fuResourceId);
683 bu_->sendDiscard(buResourceId);
689 return eventComplete;
699 if ((
int) msg->
rbBufferID < 0 || recoIndex >= nbRecoCells_)
702 "Received DISCARD DATA message with invalid recoIndex:"
705 if (acceptSMDataDiscard_[recoIndex]) {
707 nbPendingSMDiscards_--;
709 acceptSMDataDiscard_[recoIndex] =
false;
712 shmBuffer_->discardRecoCell(recoIndex);
714 rethrowShmBufferException(e,
715 "FUResourceTable:discardDataEvent:discardRecoCell");
719 LOG4CPLUS_ERROR(log_,
"Spurious DATA discard by StorageManager, skip!");
733 if ((
int) msg->
rbBufferID < 0 || recoIndex >= nbRecoCells_)
736 "Received DISCARD DATA message with invalid recoIndex:"
739 if (acceptSMDataDiscard_[recoIndex]) {
741 nbPendingSMDiscards_--;
743 acceptSMDataDiscard_[recoIndex] =
false;
746 LOG4CPLUS_ERROR(log_,
"Spurious DATA discard by StorageManager, skip!");
760 if ((
int) msg->
rbBufferID < 0 || dqmIndex >= nbDqmCells_)
763 "Received DISCARD DQM message with invalid dqmIndex:"
766 unsigned int ntries = 0;
768 while (shmBuffer_->dqmState(dqmIndex) !=
dqm::SENT) {
772 "DQM discard for cell " << dqmIndex
773 <<
" which is not yet in SENT state - waiting");
778 "DQM cell " << dqmIndex
779 <<
" discard timed out while cell still in state "
780 << shmBuffer_->dqmState(dqmIndex));
786 rethrowShmBufferException(e,
"FUResourceTable:discardDqmEvent:dqmState");
788 if (acceptSMDqmDiscard_[dqmIndex] > 0) {
789 acceptSMDqmDiscard_[dqmIndex]--;
790 if (--nbPendingSMDqmDiscards_ < 0) {
793 "Spurious??? DQM discard by StorageManager, index "
794 << dqmIndex <<
" cell state "
795 << shmBuffer_->dqmState(dqmIndex)
796 <<
" accept flag " << acceptSMDqmDiscard_[dqmIndex]);
799 shmBuffer_->discardDqmCell(dqmIndex);
801 rethrowShmBufferException(e,
802 "FUResourceTable:discardDqmEvent:discardDqmCell");
808 "Spurious DQM discard for cell " << dqmIndex
809 <<
" from StorageManager while cell is not accepting discards");
823 if ((
int) msg->
rbBufferID < 0 || dqmIndex >= nbDqmCells_)
826 "Received DISCARD DQM message with invalid dqmIndex:"
829 unsigned int ntries = 0;
831 while (shmBuffer_->dqmState(dqmIndex) !=
dqm::SENT) {
835 "DQM discard for cell " << dqmIndex
836 <<
" which is not yet in SENT state - waiting");
841 "DQM cell " << dqmIndex
842 <<
" discard timed out while cell still in state "
843 << shmBuffer_->dqmState(dqmIndex));
849 rethrowShmBufferException(e,
850 "FUResourceTable:discardDqmEventWhileHalting:dqmState(2)");
852 if (acceptSMDqmDiscard_[dqmIndex] > 0) {
853 acceptSMDqmDiscard_[dqmIndex]--;
854 if (--nbPendingSMDqmDiscards_ < 0) {
858 "Spurious??? DQM discard by StorageManager, index "
859 << dqmIndex <<
" cell state "
860 << shmBuffer_->dqmState(dqmIndex)
862 << acceptSMDqmDiscard_[dqmIndex]);
864 rethrowShmBufferException(e,
865 "FUResourceTable:discardDqmEventWhileHalting:dqmState");
872 "Spurious DQM discard for cell " << dqmIndex
873 <<
" from StorageManager while cell is not accepting discards");
882 I2O_EVM_END_OF_LUMISECTION_MESSAGE_FRAME
884 (I2O_EVM_END_OF_LUMISECTION_MESSAGE_FRAME *) bufRef->getDataLocation();
889 int lumiCheck = (int) msg->lumiSection;
891 LOG4CPLUS_ERROR(log_,
892 "Received EOL message with invalid index:" << lumiCheck);
894 for (
unsigned int i = 0;
i < nbRawCells_;
i++) {
896 if (stopFlag_)
break;
899 shmBuffer_->writeRawLumiSectionEvent(msg->lumiSection);
901 rethrowShmBufferException(e,
902 "FUResourceTable:postEndOfLumiSection:writeRawLumiSectionEvent");
911 cell = shmBuffer_->rawCellToRead();
913 rethrowShmBufferException(e,
"FUResourceTable:dropEvent:rawCellToRead");
917 shmBuffer_->finishReadingRawCell(cell);
918 shmBuffer_->scheduleRawCellForDiscard(fuResourceId);
920 rethrowShmBufferException(e,
921 "FUResourceTable:dropEvent:finishReadingRawCell/scheduleRawCellForDiscard");
928 vector < pid_t > pids = cellPrcIds();
929 UInt_t iRawCell = pids.size();
930 for (
UInt_t i = 0;
i < pids.size();
i++) {
931 if (pid == pids[
i]) {
937 if (iRawCell < pids.size()) {
939 bool shmret = shmBuffer_->writeErrorEventData(runNumber, pid, iRawCell,
true);
941 LOG4CPLUS_WARN(log_,
"Problem writing to the error stream.");
943 rethrowShmBufferException(e,
944 "FUResourceTable:handleCrashedEP:writeErrorEventData");
949 "No raw data to send to error stream for process " << pid);
951 bool success = shmBuffer_->removeClientPrcId(pid);
954 "removeClientPrcId: " << pid <<
" not in shared memory index, was in raw cell " << iRawCell);
956 rethrowShmBufferException(e,
957 "FUResourceTable:handleCrashedEP:removeClientPrcId");
965 unsigned int timeoutUs=timeout*1000000+1;
967 while (!watchDogEnd_) {
971 if (timeoutUs<=50000) {
972 LOG4CPLUS_ERROR(log_,
"Timeout in shutdownClients, status:"<< std::hex << shutdownStatus_);
973 watchDogSetFailed_=
true;
976 if (timeoutUs<=1000000*timeout/2 && !warned) {
978 LOG4CPLUS_WARN(log_,
"Long shutdown of clients, status:" << std::hex << shutdownStatus_);
985 nbClientsToShutDown_ = nbClients();
986 isReadyToShutDown_ =
false;
992 watchDogSetFailed_=
false;
996 if (nbClientsToShutDown_ == 0) {
997 shutdownStatus_|=1<<1;
1000 "No clients to shut down. Checking if there are raw cells not assigned to any process yet");
1008 "Schedule discard at STOP for orphaned event in state "
1010 shmBuffer_->scheduleRawCellForDiscardServerSide(
i);
1013 shmBuffer_->scheduleRawEmptyCellForDiscard();
1015 rethrowShmBufferException(e,
1016 "FUResourceTable:shutDownClients:evtState/scheduleRawEmptyCellForDiscard");
1022 while (shmBuffer_->nbRawCellsToWrite() < nbClients() && nbClients()
1024 shutdownStatus_|=1<<2;
1028 auto lk = lockCrashHandlerTimed(10);
1033 vector < pid_t > prcids = clientPrcIds();
1034 for (
UInt_t i = 0;
i < prcids.size();
i++) {
1035 pid_t
pid = prcids[
i];
1036 int status = kill(pid, 0);
1038 LOG4CPLUS_ERROR(log_,
1039 "EP prc " << pid <<
" completed with error.");
1046 "Timed out access to the Crash Handler in stop. SM discards not arriving?");
1053 "no cell to write stop "
1054 << shmBuffer_->nbRawCellsToWrite()
1055 <<
" nClients " << nbClients());
1057 string msg =
"No Raw Cell to Write STOP messages";
1062 shutdownStatus_|=1<<3;
1069 rethrowShmBufferException(e,
1070 "FUResourceTable:shutDownClients:nbRawCellsToWrite");
1072 nbClientsToShutDown_ = nbClients();
1073 if (nbClientsToShutDown_ == 0) {
1074 shutdownStatus_|=1<<4;
1080 state = shmBuffer_->evtState(
i);
1086 rethrowShmBufferException(e,
1087 "FUResourceTable:shutDownClients:evtState");
1092 "Schedule discard at STOP for orphaned event in state "
1095 shmBuffer_->setEvtDiscard(
i, 1,
true);
1096 shmBuffer_->scheduleRawCellForDiscardServerSide(
i);
1102 rethrowShmBufferException(e,
1103 "FUResourceTable:shutDownClients:scheduleRawCellForDiscardServerSide");
1108 shmBuffer_->scheduleRawEmptyCellForDiscard();
1114 rethrowShmBufferException(e,
1115 "FUResourceTable:shutDownClients:scheduleRawEmptyCellForDiscard");
1118 UInt_t n = nbClientsToShutDown_;
1119 shutdownStatus_|=1<<5;
1122 shmBuffer_->writeRawEmptyEvent();
1128 rethrowShmBufferException(e,
1129 "FUResourceTable:shutDownClients:writeRawEmptyEvent");
1131 shutdownStatus_|=1<<6;
1136 if (watchDogSetFailed_)
1137 XCEPT_RAISE(
evf::Exception,
"Failed (timed out) shutdown of clients");
1143 for (
UInt_t i = 0;
i < resources_.size();
i++) {
1144 resources_[
i]->release(
true);
1145 delete resources_[
i];
1148 while (!freeResourceIds_.empty())
1149 freeResourceIds_.pop();
1158 if (0 != shmBuffer_) {
1160 for (
UInt_t i = 0;
i < shmBuffer_->nRecoCells();
i++)
1161 acceptSMDataDiscard_[
i] =
false;
1162 for (
UInt_t i = 0;
i < shmBuffer_->nDqmCells();
i++)
1163 acceptSMDqmDiscard_[
i] = 0;
1165 rethrowShmBufferException(e,
1166 "FUResourceTable:resetCounters:nRecoCells/nDqmCells");
1177 nbPendingSMDiscards_ = 0;
1178 nbPendingSMDqmDiscards_ = 0;
1183 nbEolDiscarded_ = 0;
1202 if (0 != shmBuffer_)
1203 result = shmBuffer_->nClients();
1205 rethrowShmBufferException(e,
"FUResourceTable:nbClients:nClients");
1214 if (0 != shmBuffer_) {
1217 result.push_back(shmBuffer_->clientPrcId(
i));
1220 rethrowShmBufferException(e,
1221 "FUResourceTable:clientPrcIds:clientPrcIds");
1230 if (0 != shmBuffer_) {
1235 ss << shmBuffer_->clientPrcId(
i);
1239 rethrowShmBufferException(e,
1240 "FUResourceTable:clientPrcIdsAsString:clientPrcId");
1247 vector < string >
result;
1248 if (0 != shmBuffer_) {
1255 result.push_back(
"EMPTY");
1257 result.push_back(
"STOP");
1259 result.push_back(
"LUMISECTION");
1262 result.push_back(
"USEDLS");
1264 result.push_back(
"RAWWRITING");
1266 result.push_back(
"RAWWRITTEN");
1268 result.push_back(
"RAWREADING");
1270 result.push_back(
"RAWREAD");
1272 result.push_back(
"PROCESSING");
1274 result.push_back(
"PROCESSED");
1276 result.push_back(
"RECOWRITING");
1278 result.push_back(
"RECOWRITTEN");
1280 result.push_back(
"SENDING");
1282 result.push_back(
"SENT");
1284 result.push_back(
"DISCARDING");
1287 rethrowShmBufferException(e,
"FUResourceTable:cellStates:evtState");
1289 shmBuffer_->unlock();
1295 vector < string >
result;
1296 if (0 != shmBuffer_) {
1303 result.push_back(
"EMPTY");
1305 result.push_back(
"WRITING");
1307 result.push_back(
"WRITTEN");
1309 result.push_back(
"SENDING");
1311 result.push_back(
"SENT");
1313 result.push_back(
"DISCARDING");
1316 rethrowShmBufferException(e,
1317 "FUResourceTable:dqmCellStates:dqmState");
1319 shmBuffer_->unlock();
1326 vector < UInt_t >
result;
1327 if (0 != shmBuffer_) {
1332 result.push_back(shmBuffer_->evtNumber(
i));
1334 rethrowShmBufferException(e,
1335 "FUResourceTable:cellEvtNumbers:evtNumber");
1337 shmBuffer_->unlock();
1345 if (0 != shmBuffer_) {
1350 result.push_back(shmBuffer_->evtPrcId(
i));
1352 rethrowShmBufferException(e,
"FUResourceTable:cellPrcIds:evtPrcId");
1354 shmBuffer_->unlock();
1361 vector < time_t >
result;
1363 if (0 != shmBuffer_) {
1367 result.push_back(shmBuffer_->evtTimeStamp(
i));
1368 shmBuffer_->unlock();
1371 rethrowShmBufferException(e,
1372 "FUResourceTable:cellTimeStamps:evtTimeStamp");
1384 ost <<
"lastResort: " << shmBuffer_->nbRawCellsToRead()
1385 <<
" more rawcells to read ";
1386 LOG4CPLUS_WARN(log_,ost.str());
1389 while (shmBuffer_->nbRawCellsToRead() != 0) {
1391 std::cout <<
"lastResort: " << shmBuffer_->nbRawCellsToRead()
1394 LOG4CPLUS_WARN(log_,
"lastResort: Scheduling raw cell (server side) "<< newCell->
index());
1395 shmBuffer_->scheduleRawCellForDiscardServerSide(newCell->
index());
1397 std::cout <<
"lastResort: schedule raw cell for discard "
1398 << newCell->
index() << std::endl;
1401 LOG4CPLUS_WARN(log_,
"lastResort: scheduling empty raw cell (server side) ");
1402 shmBuffer_->scheduleRawEmptyCellForDiscard();
1403 LOG4CPLUS_WARN(log_,
"lastResort: Finished. cells remaining: " << shmBuffer_->nbRawCellsToRead());
1405 rethrowShmBufferException(
1407 "FUResourceTable:lastResort:nbRawCellsToRead/scheduleRawCellForDiscardServerSide");
1409 LOG4CPLUS_WARN(log_,
"Last resort finished ");
1413 if (shmBuffer_ != 0) {
1416 while (countdown_-- && (sDataActive_ || sDqmActive_)) ::usleep(50000);
1417 if (countdown_<=0) {
1418 std::ostringstream ostr;
1419 ostr <<
"Resource broker timed out waiting for workloop shutdowns (3 seconds). Continuing to reset Shm. States - "
1420 <<
" sendDqm:"<<sDqmActive_ <<
" sendData:" << sDataActive_;
1421 LOG4CPLUS_ERROR(log_,ostr.str());
1425 shmBuffer_->reset(
false);
1426 LOG4CPLUS_INFO(log_,
"ShmBuffer was reset!");
1431 if (shmBuffer_)
return shmBuffer_->sem_print_s();
1432 else return std::string(
"ShmBuffer not initialized");
1437 stringstream details;
1438 vector < string > dataStates = cellStates();
1439 vector < string > dqmStates = dqmCellStates();
1440 details <<
"Exception raised: " <<
e.what() <<
" (in module: "
1441 <<
e.module() <<
" in function: " <<
e.function() <<
" at line: "
1443 details <<
" Dumping cell state... ";
1444 details <<
"data cells --> ";
1445 for (
unsigned int i = 0;
i < dataStates.size();
i++)
1446 details << dataStates[
i] <<
" ";
1447 details <<
"dqm cells --> ";
1448 for (
unsigned int i = 0;
i < dqmStates.size();
i++)
1449 details << dqmStates[
i] <<
" ";
1450 details <<
" ... originated in: " << where;
unsigned int index() const
static const char runNumber_[]
unsigned int fuGuid() const
bool discardDataEvent(MemRef_t *bufRef)
static int shm_nattch(int shmid)
void process(MemRef_t *bufRef)
std::vector< UInt_t > cellEvtNumbers() const
unsigned int type() const
unsigned int index() const
unsigned int runNumber() const
std::vector< std::string > cellStates() const
unsigned int fuGuid() const
FUResourceTable(bool segmentationMode, UInt_t nbRawCells, UInt_t nbRecoCells, UInt_t nbDqmCells, UInt_t rawCellSize, UInt_t recoCellSize, UInt_t dqmCellSize, int freeResReq, BUProxy *bu, SMProxy *sm, log4cplus::Logger logger, unsigned int, EvffedFillerRB *frb, xdaq::Application *)
unsigned int folderId() const
bool handleCrashedEP(UInt_t runNumber, pid_t pid)
UInt_t nbErrors(bool reset=true)
unsigned int eventSize() const
toolbox::mem::Reference MemRef_t
std::string printStatus()
unsigned int evtAtUpdate() const
bool buildResource(MemRef_t *bufRef)
unsigned int fuProcessId() const
static bool releaseSharedMemory()
void release(bool detachResource)
unsigned char * payloadAddr() const
unsigned int evtNumber() const
unsigned int nExpectedEPs() const
unsigned int rawCellIndex() const
unsigned int fuProcessId() const
bool discardWhileHalting(bool sendDiscards)
unsigned int runNumber() const
void shutdownWatchdog(unsigned int timeout)
void clear(CLHEP::HepGenMatrix &m)
Helper function: Reset all elements of a matrix to 0.
std::vector< time_t > cellTimeStamps() const
bool discardDqmEventWhileHalting(MemRef_t *bufRef)
std::vector< pid_t > clientPrcIds() const
block
Formating index page's pieces.
static FUShmBuffer * createShmBuffer(bool semgmentationMode, unsigned int nRawCells, unsigned int nRecoCells, unsigned int nDqmCells, unsigned int rawCellSize=0x400000, unsigned int recoCellSize=0x400000, unsigned int dqmCellSize=0x400000)
unsigned char * payloadAddr() const
evf::FUShmRawCell * shmCell()
std::string clientPrcIdsAsString() const
unsigned long long uint64_t
unsigned int eventSize() const
void discardNoReschedule()
void doCrcCheck(bool doCrcCheck)
unsigned int index() const
std::vector< pid_t > cellPrcIds() const
unsigned int outModId() const
unsigned int fuResourceId() const
UInt_t nbCrcErrors(bool reset=true)
void postEndOfLumiSection(MemRef_t *bufRef)
virtual ~FUResourceTable()
std::vector< std::string > dqmCellStates() const
bool sendDataWhileHalting()
void rethrowShmBufferException(evf::Exception &e, std::string where) const
void allocate(FUShmRawCell *shmCell)
bool discardDqmEvent(MemRef_t *bufRef)
unsigned int buResourceId() const
bool discardDataEventWhileHalting(MemRef_t *bufRef)
void resetIPC()
reset the ShmBuffer to the initial state
bool sendDqmWhileHalting()
void initialize(bool segmentationMode, UInt_t nbRawCells, UInt_t nbRecoCells, UInt_t nbDqmCells, UInt_t rawCellSize, UInt_t recoCellSize, UInt_t dqmCellSize)