20 #include "interface/evb/i2oEVBMsgs.h"
21 #include "xcept/tools.h"
31 using std::stringstream;
45 IPCMethod(segmentationMode, nbRawCells, nbRecoCells, nbDqmCells,
46 rawCellSize, recoCellSize, dqmCellSize, freeResReq, bu, sm,
47 logger, timeout, frb, app), msq_(99) {
50 initialize(segmentationMode, nbRawCells, nbRecoCells, nbDqmCells,
51 rawCellSize, recoCellSize, dqmCellSize);
61 LOG4CPLUS_INFO(
log_,
"MESSAGE QUEUE SUCCESSFULLY RELEASED.");
81 rawCellSize_ = rawCellSize;
82 recoCellSize_ = recoCellSize;
83 dqmCellSize_ = dqmCellSize;
87 if (0 == &msq_ || 0 == msq_.id()) {
88 string msg =
"CREATION OF MESSAGE QUEUE FAILED!";
89 LOG4CPLUS_FATAL(log_, msg);
94 cache_->initialise(nbRawCells, rawCellSize);
97 for (
UInt_t i = 0;
i < nbRawCells_;
i++) {
100 resources_.push_back(newResource);
101 freeResourceIds_.push(
i);
113 bool reschedule =
true;
120 cout <<
"RCV failed!" << endl;
130 LOG4CPLUS_INFO(
log_,
"Don't reschedule sendData workloop.");
140 LOG4CPLUS_INFO(
log_,
"sendData: isHalting, discard recoCell.");
150 if (cell->
type() == 0) {
165 cellFUGuid, cellPayloadAddr, cellEventSize,
171 }
else if (cell->
type() == 1) {
190 cellOutModId, cellFUProcId, cellFUGuid,
191 cellPayloadAddr, cellEventSize);
195 }
else if (cell->
type() == 2) {
216 cellFUProcId, cellFUGuid, cellPayloadAddr,
220 "Unknown RecoCell type (neither INIT/DATA/ERROR).";
226 "Failed to send EVENT DATA to StorageManager: "
227 << xcept::stdformat_exception_history(e));
237 bool reschedule =
true;
244 cout <<
"RCV failed!" << endl;
254 LOG4CPLUS_INFO(
log_,
"Don't reschedule sendData workloop.");
264 LOG4CPLUS_INFO(
log_,
"sendData: isHalting, discard recoCell.");
274 if (cell->
type() == 0) {
289 cellFUGuid, cellPayloadAddr, cellEventSize,
295 }
else if (cell->
type() == 1) {
315 cellOutModId, cellFUProcId, cellFUGuid,
316 cellPayloadAddr, cellEventSize);
320 }
else if (cell->
type() == 2) {
341 cellFUProcId, cellFUGuid, cellPayloadAddr,
345 "Unknown RecoCell type (neither INIT/DATA/ERROR).";
351 "Failed to send EVENT DATA to StorageManager: "
352 << xcept::stdformat_exception_history(e));
363 bool reschedule =
true;
372 cout <<
"RCV failed!" << endl;
381 LOG4CPLUS_WARN(
log_,
"Don't reschedule sendDqm workloop.");
382 cout <<
"shut down dqm workloop " << endl;
406 cellFolderId, cellFUProcId, cellFUGuid, cellPayloadAddr,
412 "Failed to send DQM DATA to StorageManager: "
413 << xcept::stdformat_exception_history(e));
423 bool reschedule =
true;
432 cout <<
"RCV failed!" << endl;
441 LOG4CPLUS_WARN(
log_,
"Don't reschedule sendDqm workloop.");
442 cout <<
"shut down dqm workloop " << endl;
466 cellFolderId, cellFUProcId, cellFUGuid, cellPayloadAddr,
472 "Failed to send DQM DATA to StorageManager: "
473 << xcept::stdformat_exception_history(e));
485 bool reschedule =
true;
494 cout <<
"RCV failed!" << endl;
499 unsigned int* pBuID = (
unsigned int*) discardRaw->mtext;
500 unsigned int* pFuID = (
unsigned int*) (discardRaw->mtext
501 +
sizeof(
unsigned int));
503 unsigned int buResourceId = *pBuID;
504 unsigned int fuResourceId = *pFuID;
506 cout <<
"Discard received for buResourceID: " << buResourceId
507 <<
" fuResourceID " << fuResourceId << endl << endl;
627 bool reschedule =
true;
636 cout <<
"RCV failed!" << endl;
641 unsigned int* pBuID = (
unsigned int*) discardRaw->mtext;
642 unsigned int* pFuID = (
unsigned int*) (discardRaw->mtext
643 +
sizeof(
unsigned int));
645 unsigned int buResourceId = *pBuID;
646 unsigned int fuResourceId = *pFuID;
648 cout <<
"Discard received for buResourceID: " << buResourceId
649 <<
" fuResourceID " << fuResourceId << endl << endl;
773 bool eventComplete =
false;
775 I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME *
block =
776 (I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME*) bufRef->getDataLocation();
791 gettimeofday(&now, 0);
836 string errmsg =
"Failed to post message to Queue!";
837 LOG4CPLUS_FATAL(
log_, errmsg);
851 eventComplete =
true;
872 return eventComplete;
1072 bool retval =
false;
unsigned int index() const
unsigned int fuGuid() const
bool sendDqmWhileHalting()
int postLength(MsgBuf &ptr, unsigned int length)
bool handleCrashedEP(UInt_t runNumber, pid_t pid)
void process(MemRef_t *bufRef)
void sendDiscard(UInt_t buResourceId)
unsigned int type() const
void initialize(bool segmentationMode, UInt_t nbRawCells, UInt_t nbRecoCells, UInt_t nbDqmCells, UInt_t rawCellSize, UInt_t recoCellSize, UInt_t dqmCellSize)
unsigned int index() const
unsigned int runNumber() const
std::vector< pid_t > clientPrcIds() const
unsigned int fuGuid() const
void setRBEventCount(uint32_t evtcnt)
unsigned int folderId() const
void sendDiscard(UInt_t buResourceId)
UInt_t nbErrors(bool reset=true)
unsigned int eventSize() const
static const unsigned int DISCARD_RAW_MESSAGE_TYPE
toolbox::mem::Reference MemRef_t
unsigned int evtAtUpdate() const
std::vector< std::string > cellStates() const
bool isLastMessageOfEvent(MemRef_t *bufRef)
unsigned int fuProcessId() const
void release(bool detachResource)
virtual ~FUResourceQueue()
RawMsgBuf * getMsgToWrite()
unsigned char * payloadAddr() const
unsigned int evtNumber() const
UInt_t nbPendingSMDiscards_
unsigned int nExpectedEPs() const
unsigned int rawCellIndex() const
unsigned int fuProcessId() const
void initialize(unsigned int index)
static RawCache * getInstance()
std::vector< time_t > cellTimeStamps() const
unsigned int runNumber() const
bool sendDataWhileHalting()
FUResourceQueue(bool segmentationMode, UInt_t nbRawCells, UInt_t nbRecoCells, UInt_t nbDqmCells, UInt_t rawCellSize, UInt_t recoCellSize, UInt_t dqmCellSize, int freeResReq, BUProxy *bu, SMProxy *sm, log4cplus::Logger logger, unsigned int, EvffedFillerRB *frb, xdaq::Application *)
void clear(CLHEP::HepGenMatrix &m)
Helper function: Reset all elements of a matrix to 0.
void postEndOfLumiSection(MemRef_t *bufRef)
bool discardDqmEventWhileHalting(MemRef_t *bufRef)
void sendDqmEvent(UInt_t fuDqmId, UInt_t runNumber, UInt_t evtAtUpdate, UInt_t folderId, UInt_t fuProcessId, UInt_t fuGuid, UChar_t *data, UInt_t dataSize)
FUResourceVec_t resources_
static const unsigned int RECO_MESSAGE_TYPE
block
Formating index page's pieces.
void releaseMsg(unsigned int fuResourceId)
unsigned char * payloadAddr() const
unsigned long long uint64_t
bool buildResource(MemRef_t *bufRef)
unsigned int eventSize() const
std::atomic< int > nbPendingSMDqmDiscards_
void doCrcCheck(bool doCrcCheck)
void sendErrorEvent(UInt_t fuResourceId, UInt_t runNumber, UInt_t evtNumber, UInt_t fuProcessId, UInt_t fuGuid, UChar_t *data, UInt_t dataSize)
std::string clientPrcIdsAsString() const
unsigned int outModId() const
bool discardDataEvent(MemRef_t *bufRef)
bool discardDqmEvent(MemRef_t *bufRef)
bool rcvQuiet(MsgBuf &ptr)
bool discardDataEventWhileHalting(MemRef_t *bufRef)
void sendDataEvent(UInt_t fuResourceId, UInt_t runNumber, UInt_t evtNumber, UInt_t outModId, UInt_t fuProcessId, UInt_t fuGuid, UChar_t *data, UInt_t dataSize)
std::vector< std::string > dqmCellStates() const
FUShmRecoCell * recoCell()
std::queue< UInt_t > freeResourceIds_
UInt_t nbCrcErrors(bool reset=true)
void setRBTimeStamp(uint64_t ts)
void sendInitMessage(UInt_t fuResourceId, UInt_t outModId, UInt_t fuProcessId, UInt_t fuGuid, UChar_t *data, UInt_t dataSize, UInt_t nExpectedEPs)
static const unsigned int DQM_MESSAGE_TYPE
std::vector< UInt_t > cellEvtNumbers() const
void allocate(FUShmRawCell *shmCell)
std::vector< pid_t > cellPrcIds() const
bool discardWhileHalting(bool sendDiscards)