CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_6_1_1/src/EventFilter/ResourceBroker/src/IPCMethod.cc

Go to the documentation of this file.
00001 
00002 //
00003 // IPCMethod.cc
00004 // -------
00005 //
00006 // Contains common functionality for FUResourceTable and FUResourceQueue.
00007 //
00008 //  Created on: Oct 26, 2011
00009 //                                                                              Andrei Spataru : aspataru@cern.ch
00011 
00012 #include "EventFilter/ResourceBroker/interface/FUResourceTable.h"
00013 #include "EventFilter/ResourceBroker/interface/IPCMethod.h"
00014 
00015 #include "interface/evb/i2oEVBMsgs.h"
00016 
00017 #include <iomanip>
00018 
00019 using std::ofstream;
00020 using std::endl;
00021 using namespace evf;
00022 
00024 // construction/destruction
00026 
00027 //______________________________________________________________________________
00028 IPCMethod::IPCMethod(bool segmentationMode, UInt_t nbRawCells,
00029                 UInt_t nbRecoCells, UInt_t nbDqmCells, UInt_t rawCellSize,
00030                 UInt_t recoCellSize, UInt_t dqmCellSize, int freeResReq, BUProxy *bu,
00031                 SMProxy *sm, log4cplus::Logger logger, unsigned int timeout,
00032                 EvffedFillerRB *frb, xdaq::Application*app) throw (evf::Exception) :
00033         bu_(bu), sm_(sm), log_(logger), nbDqmCells_(nbDqmCells),
00034                         nbRawCells_(nbRawCells), nbRecoCells_(nbRecoCells),
00035                         acceptSMDataDiscard_(0), acceptSMDqmDiscard_(0), doCrcCheck_(1),
00036                         shutdownTimeout_(timeout), nbPending_(0), nbClientsToShutDown_(0),
00037                         isReadyToShutDown_(true), isActive_(false), runNumber_(0xffffffff),
00038                         frb_(frb), app_(app) {
00039 
00040         // if the freeResRequiredForAllocate_ threshold is set in configuration use that
00041         // otherwise use nbRawCells / 2
00042         if (freeResReq < 0)
00043                 freeResRequiredForAllocate_ = nbRawCells_ / 2;
00044         else
00045                 freeResRequiredForAllocate_ = freeResReq;
00046 
00047         sem_init(&lock_, 0, 1);
00048         //pthread_mutex_init(&crashHandlerLock_, NULL);
00049 }
00050 
00051 IPCMethod::~IPCMethod() {
00052 
00053 }
00054 
00056 // implementation of member functions
00058 
00059 //______________________________________________________________________________
00060 UInt_t IPCMethod::allocateResource() {
00061         assert(!freeResourceIds_.empty());
00062 
00063         lock();
00064         UInt_t fuResourceId = freeResourceIds_.front();
00065         freeResourceIds_.pop();
00066         nbPending_++;
00067         nbAllocated_++;
00068         unlock();
00069 
00070         return fuResourceId;
00071 }
00072 
00073 //______________________________________________________________________________
00074 void IPCMethod::dumpEvent(FUShmRawCell* cell) {
00075         std::ostringstream oss;
00076         oss << "/tmp/evt" << cell->evtNumber() << ".dump";
00077         ofstream fout(oss.str().c_str());
00078         fout.fill('0');
00079 
00080         fout << "#\n# evt " << cell->evtNumber() << "\n#\n" << endl;
00081         for (unsigned int i = 0; i < cell->nFed(); i++) {
00082                 if (cell->fedSize(i) == 0)
00083                         continue;
00084                 fout << "# fedid " << i << endl;
00085                 unsigned char* addr = cell->fedAddr(i);
00086                 for (unsigned int j = 0; j < cell->fedSize(i); j++) {
00087                         fout << std::setiosflags(std::ios::right) << std::setw(2)
00088                                         << std::hex << (int) (*addr) << std::dec;
00089                         if ((j + 1) % 8)
00090                                 fout << " ";
00091                         else
00092                                 fout << endl;
00093                         ++addr;
00094                 }
00095                 fout << endl;
00096         }
00097         fout.close();
00098 }
00099 
00100 //______________________________________________________________________________
00101 std::string IPCMethod::printStatus() {
00102         std::string s = "Status not implemented";
00103         return s;
00104 }
00105 
00107 // implementation of private member functions
00109 
00110 //______________________________________________________________________________
00111 void IPCMethod::sendAllocate() {
00112         UInt_t nbFreeSlots = this->nbFreeSlots();
00113         /*UInt_t nbFreeSlotsMax = 0*///reverting to larger chunk requests for BU
00114         //UInt_t nbFreeSlotsMax = nbResources() / 2;
00115         UInt_t nbFreeSlotsMax = freeResRequiredForAllocate_;
00116 
00117         if (nbFreeSlots > nbFreeSlotsMax) {
00118                 UIntVec_t fuResourceIds;
00119                 for (UInt_t i = 0; i < nbFreeSlots; i++)
00120                         fuResourceIds.push_back(allocateResource());
00121 
00122                 bu_->sendAllocate(fuResourceIds);
00123 
00124                 nbAllocSent_++;
00125         }
00126 }
00127 
00128 //______________________________________________________________________________
00129 void IPCMethod::resetPendingAllocates() {
00130         if (freeResourceIds_.size() < nbRawCells_) {
00131                 LOG4CPLUS_INFO(
00132                                 log_,
00133                                 "There are " << nbRawCells_ - freeResourceIds_.size()
00134                                                 << " pending ALLOCATE messages! Forgetting...");
00135                 while (!freeResourceIds_.empty())
00136                         freeResourceIds_.pop();
00137                 for (UInt_t i = 0; i < nbRawCells_; i++)
00138                         freeResourceIds_.push(i);
00139         }
00140 }
00141 
00142 //______________________________________________________________________________
00143 void IPCMethod::releaseResources() {
00144         for (UInt_t i = 0; i < resources_.size(); i++)
00145                 resources_[i]->release(true);
00146 
00147 }
00148 
00149 //______________________________________________________________________________
00150 void IPCMethod::sendDiscard(UInt_t buResourceId) {
00151         bu_->sendDiscard(buResourceId);
00152         nbDiscarded_++;
00153 }
00154 
00155 //______________________________________________________________________________
00156 void IPCMethod::sendInitMessage(UInt_t fuResourceId, UInt_t outModId,
00157                 UInt_t fuProcessId, UInt_t fuGuid, UChar_t *data, UInt_t dataSize,
00158                 UInt_t nExpectedEPs) {
00159         if (0 == sm_) {
00160                 LOG4CPLUS_ERROR(log_, "No StorageManager, DROP INIT MESSAGE!");
00161         } else {
00162                 acceptSMDataDiscard_[fuResourceId] = true;
00163                 UInt_t nbBytes = sm_->sendInitMessage(fuResourceId, outModId,
00164                                 fuProcessId, fuGuid, data, dataSize, nExpectedEPs);
00165                 sumOfSquares_ += (uint64_t) nbBytes * (uint64_t) nbBytes;
00166                 sumOfSizes_ += nbBytes;
00167         }
00168 }
00169 
00170 //______________________________________________________________________________
00171 void IPCMethod::sendDataEvent(UInt_t fuResourceId, UInt_t runNumber,
00172                 UInt_t evtNumber, UInt_t outModId, UInt_t fuProcessId, UInt_t fuGuid,
00173                 UChar_t *data, UInt_t dataSize) {
00174         if (0 == sm_) {
00175                 LOG4CPLUS_ERROR(log_, "No StorageManager, DROP DATA EVENT!");
00176         } else {
00177                 acceptSMDataDiscard_[fuResourceId] = true;
00178                 UInt_t nbBytes = sm_->sendDataEvent(fuResourceId, runNumber, evtNumber,
00179                                 outModId, fuProcessId, fuGuid, data, dataSize);
00180                 sumOfSquares_ += (uint64_t) nbBytes * (uint64_t) nbBytes;
00181                 sumOfSizes_ += nbBytes;
00182         }
00183 }
00184 
00185 //______________________________________________________________________________
00186 void IPCMethod::sendErrorEvent(UInt_t fuResourceId, UInt_t runNumber,
00187                 UInt_t evtNumber, UInt_t fuProcessId, UInt_t fuGuid, UChar_t *data,
00188                 UInt_t dataSize) {
00189         if (0 == sm_) {
00190                 LOG4CPLUS_ERROR(log_, "No StorageManager, DROP ERROR EVENT!");
00191         } else {
00192                 acceptSMDataDiscard_[fuResourceId] = true;
00193                 UInt_t nbBytes = sm_->sendErrorEvent(fuResourceId, runNumber,
00194                                 evtNumber, fuProcessId, fuGuid, data, dataSize);
00195                 sumOfSquares_ += (uint64_t) nbBytes * (uint64_t) nbBytes;
00196                 sumOfSizes_ += nbBytes;
00197         }
00198 
00199         //   if (0!=shmBuffer_) {
00200         //     UInt_t n=nbDqmCells_;
00201 
00202         //     for (UInt_t i=0;i<n;i++) {
00203         //       if(shmBuffer_->dqmCell(i)->fuProcessId()==fuProcessId)
00204         //      {
00205         //        if(shmBuffer_->dqmState(i)!=dqm::SENT){
00206         //          shmBuffer_->setDqmState(i,dqm::SENT);
00207         //          shmBuffer_->discardDqmCell(i);
00208         //          acceptSMDqmDiscard_[i] = false;
00209         //        }
00210         //      }
00211         //     }
00212         //     n=nbRecoCells_;
00213         //     for (UInt_t i=0;i<n;i++) {
00214         //       if(shmBuffer_->recoCell(i)->fuProcessId()==fuProcessId)
00215         //      {
00216         //        shmBuffer_->discardOrphanedRecoCell(i);
00217         //      }
00218         //     }
00219 
00220         //   }
00221 }
00222 
00223 //______________________________________________________________________________
00224 void IPCMethod::sendDqmEvent(UInt_t fuDqmId, UInt_t runNumber,
00225                 UInt_t evtAtUpdate, UInt_t folderId, UInt_t fuProcessId, UInt_t fuGuid,
00226                 UChar_t* data, UInt_t dataSize) {
00227         if (0 == sm_) {
00228                 LOG4CPLUS_WARN(log_, "No StorageManager, DROP DQM EVENT.");
00229         } else {
00230 
00231                 nbPendingSMDqmDiscards_++;
00232 
00233                 acceptSMDqmDiscard_[fuDqmId]++;
00234                 if (acceptSMDqmDiscard_[fuDqmId] > 1)
00235                         LOG4CPLUS_WARN(
00236                                         log_,
00237                                         "DQM Cell " << fuDqmId
00238                                                         << " being sent more than once for folder "
00239                                                         << folderId << " process " << fuProcessId
00240                                                         << " guid " << fuGuid);
00241                 nbSentDqm_++;
00242                 sm_->sendDqmEvent(fuDqmId, runNumber, evtAtUpdate, folderId,
00243                                 fuProcessId, fuGuid, data, dataSize);
00244         }
00245 }
00246 
00247 //______________________________________________________________________________
00248 bool IPCMethod::isLastMessageOfEvent(MemRef_t* bufRef) {
00249         while (0 != bufRef->getNextReference())
00250                 bufRef = bufRef->getNextReference();
00251 
00252         I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME *block =
00253                         (I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME*) bufRef->getDataLocation();
00254 
00255         UInt_t iBlock = block->blockNb;
00256         UInt_t nBlock = block->nbBlocksInSuperFragment;
00257         UInt_t iSuperFrag = block->superFragmentNb;
00258         UInt_t nSuperFrag = block->nbSuperFragmentsInEvent;
00259 
00260         return ((iSuperFrag == nSuperFrag - 1) && (iBlock == nBlock - 1));
00261 }
00262 
00263 //______________________________________________________________________________
00264 void IPCMethod::injectCRCError() {
00265         for (UInt_t i = 0; i < resources_.size(); i++) {
00266                 resources_[i]->scheduleCRCError();
00267         }
00268 }