00001
00002
00003
00004
00005
00006
00007
00008
00009
00011
00012 #include "EventFilter/ResourceBroker/interface/FUResourceTable.h"
00013 #include "EventFilter/ResourceBroker/interface/IPCMethod.h"
00014
00015 #include "interface/evb/i2oEVBMsgs.h"
00016
00017 #include <iomanip>
00018
00019 using std::ofstream;
00020 using std::endl;
00021 using namespace evf;
00022
00024
00026
00027
00028 IPCMethod::IPCMethod(bool segmentationMode, UInt_t nbRawCells,
00029 UInt_t nbRecoCells, UInt_t nbDqmCells, UInt_t rawCellSize,
00030 UInt_t recoCellSize, UInt_t dqmCellSize, int freeResReq, BUProxy *bu,
00031 SMProxy *sm, log4cplus::Logger logger, unsigned int timeout,
00032 EvffedFillerRB *frb, xdaq::Application*app) throw (evf::Exception) :
00033 bu_(bu), sm_(sm), log_(logger), nbDqmCells_(nbDqmCells),
00034 nbRawCells_(nbRawCells), nbRecoCells_(nbRecoCells),
00035 acceptSMDataDiscard_(0), acceptSMDqmDiscard_(0), doCrcCheck_(1),
00036 shutdownTimeout_(timeout), nbPending_(0), nbClientsToShutDown_(0),
00037 isReadyToShutDown_(true), isActive_(false), runNumber_(0xffffffff),
00038 frb_(frb), app_(app) {
00039
00040
00041
00042 if (freeResReq < 0)
00043 freeResRequiredForAllocate_ = nbRawCells_ / 2;
00044 else
00045 freeResRequiredForAllocate_ = freeResReq;
00046
00047 sem_init(&lock_, 0, 1);
00048
00049 }
00050
00051 IPCMethod::~IPCMethod() {
00052
00053 }
00054
00056
00058
00059
00060 UInt_t IPCMethod::allocateResource() {
00061 assert(!freeResourceIds_.empty());
00062
00063 lock();
00064 UInt_t fuResourceId = freeResourceIds_.front();
00065 freeResourceIds_.pop();
00066 nbPending_++;
00067 nbAllocated_++;
00068 unlock();
00069
00070 return fuResourceId;
00071 }
00072
00073
00074 void IPCMethod::dumpEvent(FUShmRawCell* cell) {
00075 std::ostringstream oss;
00076 oss << "/tmp/evt" << cell->evtNumber() << ".dump";
00077 ofstream fout(oss.str().c_str());
00078 fout.fill('0');
00079
00080 fout << "#\n# evt " << cell->evtNumber() << "\n#\n" << endl;
00081 for (unsigned int i = 0; i < cell->nFed(); i++) {
00082 if (cell->fedSize(i) == 0)
00083 continue;
00084 fout << "# fedid " << i << endl;
00085 unsigned char* addr = cell->fedAddr(i);
00086 for (unsigned int j = 0; j < cell->fedSize(i); j++) {
00087 fout << std::setiosflags(std::ios::right) << std::setw(2)
00088 << std::hex << (int) (*addr) << std::dec;
00089 if ((j + 1) % 8)
00090 fout << " ";
00091 else
00092 fout << endl;
00093 ++addr;
00094 }
00095 fout << endl;
00096 }
00097 fout.close();
00098 }
00099
00100
00101 std::string IPCMethod::printStatus() {
00102 std::string s = "Status not implemented";
00103 return s;
00104 }
00105
00107
00109
00110
00111 void IPCMethod::sendAllocate() {
00112 UInt_t nbFreeSlots = this->nbFreeSlots();
00113
00114
00115 UInt_t nbFreeSlotsMax = freeResRequiredForAllocate_;
00116
00117 if (nbFreeSlots > nbFreeSlotsMax) {
00118 UIntVec_t fuResourceIds;
00119 for (UInt_t i = 0; i < nbFreeSlots; i++)
00120 fuResourceIds.push_back(allocateResource());
00121
00122 bu_->sendAllocate(fuResourceIds);
00123
00124 nbAllocSent_++;
00125 }
00126 }
00127
00128
00129 void IPCMethod::resetPendingAllocates() {
00130 if (freeResourceIds_.size() < nbRawCells_) {
00131 LOG4CPLUS_INFO(
00132 log_,
00133 "There are " << nbRawCells_ - freeResourceIds_.size()
00134 << " pending ALLOCATE messages! Forgetting...");
00135 while (!freeResourceIds_.empty())
00136 freeResourceIds_.pop();
00137 for (UInt_t i = 0; i < nbRawCells_; i++)
00138 freeResourceIds_.push(i);
00139 }
00140 }
00141
00142
00143 void IPCMethod::releaseResources() {
00144 for (UInt_t i = 0; i < resources_.size(); i++)
00145 resources_[i]->release(true);
00146
00147 }
00148
00149
00150 void IPCMethod::sendDiscard(UInt_t buResourceId) {
00151 bu_->sendDiscard(buResourceId);
00152 nbDiscarded_++;
00153 }
00154
00155
00156 void IPCMethod::sendInitMessage(UInt_t fuResourceId, UInt_t outModId,
00157 UInt_t fuProcessId, UInt_t fuGuid, UChar_t *data, UInt_t dataSize,
00158 UInt_t nExpectedEPs) {
00159 if (0 == sm_) {
00160 LOG4CPLUS_ERROR(log_, "No StorageManager, DROP INIT MESSAGE!");
00161 } else {
00162 acceptSMDataDiscard_[fuResourceId] = true;
00163 UInt_t nbBytes = sm_->sendInitMessage(fuResourceId, outModId,
00164 fuProcessId, fuGuid, data, dataSize, nExpectedEPs);
00165 sumOfSquares_ += (uint64_t) nbBytes * (uint64_t) nbBytes;
00166 sumOfSizes_ += nbBytes;
00167 }
00168 }
00169
00170
00171 void IPCMethod::sendDataEvent(UInt_t fuResourceId, UInt_t runNumber,
00172 UInt_t evtNumber, UInt_t outModId, UInt_t fuProcessId, UInt_t fuGuid,
00173 UChar_t *data, UInt_t dataSize) {
00174 if (0 == sm_) {
00175 LOG4CPLUS_ERROR(log_, "No StorageManager, DROP DATA EVENT!");
00176 } else {
00177 acceptSMDataDiscard_[fuResourceId] = true;
00178 UInt_t nbBytes = sm_->sendDataEvent(fuResourceId, runNumber, evtNumber,
00179 outModId, fuProcessId, fuGuid, data, dataSize);
00180 sumOfSquares_ += (uint64_t) nbBytes * (uint64_t) nbBytes;
00181 sumOfSizes_ += nbBytes;
00182 }
00183 }
00184
00185
00186 void IPCMethod::sendErrorEvent(UInt_t fuResourceId, UInt_t runNumber,
00187 UInt_t evtNumber, UInt_t fuProcessId, UInt_t fuGuid, UChar_t *data,
00188 UInt_t dataSize) {
00189 if (0 == sm_) {
00190 LOG4CPLUS_ERROR(log_, "No StorageManager, DROP ERROR EVENT!");
00191 } else {
00192 acceptSMDataDiscard_[fuResourceId] = true;
00193 UInt_t nbBytes = sm_->sendErrorEvent(fuResourceId, runNumber,
00194 evtNumber, fuProcessId, fuGuid, data, dataSize);
00195 sumOfSquares_ += (uint64_t) nbBytes * (uint64_t) nbBytes;
00196 sumOfSizes_ += nbBytes;
00197 }
00198
00199
00200
00201
00202
00203
00204
00205
00206
00207
00208
00209
00210
00211
00212
00213
00214
00215
00216
00217
00218
00219
00220
00221 }
00222
00223
00224 void IPCMethod::sendDqmEvent(UInt_t fuDqmId, UInt_t runNumber,
00225 UInt_t evtAtUpdate, UInt_t folderId, UInt_t fuProcessId, UInt_t fuGuid,
00226 UChar_t* data, UInt_t dataSize) {
00227 if (0 == sm_) {
00228 LOG4CPLUS_WARN(log_, "No StorageManager, DROP DQM EVENT.");
00229 } else {
00230
00231 nbPendingSMDqmDiscards_++;
00232
00233 acceptSMDqmDiscard_[fuDqmId]++;
00234 if (acceptSMDqmDiscard_[fuDqmId] > 1)
00235 LOG4CPLUS_WARN(
00236 log_,
00237 "DQM Cell " << fuDqmId
00238 << " being sent more than once for folder "
00239 << folderId << " process " << fuProcessId
00240 << " guid " << fuGuid);
00241 nbSentDqm_++;
00242 sm_->sendDqmEvent(fuDqmId, runNumber, evtAtUpdate, folderId,
00243 fuProcessId, fuGuid, data, dataSize);
00244 }
00245 }
00246
00247
00248 bool IPCMethod::isLastMessageOfEvent(MemRef_t* bufRef) {
00249 while (0 != bufRef->getNextReference())
00250 bufRef = bufRef->getNextReference();
00251
00252 I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME *block =
00253 (I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME*) bufRef->getDataLocation();
00254
00255 UInt_t iBlock = block->blockNb;
00256 UInt_t nBlock = block->nbBlocksInSuperFragment;
00257 UInt_t iSuperFrag = block->superFragmentNb;
00258 UInt_t nSuperFrag = block->nbSuperFragmentsInEvent;
00259
00260 return ((iSuperFrag == nSuperFrag - 1) && (iBlock == nBlock - 1));
00261 }
00262
00263
00264 void IPCMethod::injectCRCError() {
00265 for (UInt_t i = 0; i < resources_.size(); i++) {
00266 resources_[i]->scheduleCRCError();
00267 }
00268 }