00001
00002
00003
00004
00005
00006 #include "EventFilter/StorageManager/interface/SMFUSenderList.h"
00007 #include "FWCore/Utilities/interface/DebugMacros.h"
00008
00009 using namespace stor;
00010 using namespace std;
00011 using namespace edm;
00012
00013 SMFUSenderList::SMFUSenderList()
00014 {
00015 FDEBUG(10) << "SMFUSenderList: Making a SMFUSenderList" << endl;
00016 }
00017
00018 void SMFUSenderList::clear()
00019 {
00020 boost::mutex::scoped_lock sl(list_lock_);
00021 senderlist_.clear();
00022 numberOfRB_ = 0;
00023 numberOfOM_ = 0;
00024 }
00025
00026 unsigned int SMFUSenderList::size()
00027 {
00028 boost::mutex::scoped_lock sl(list_lock_);
00029 return (unsigned int)senderlist_.size();
00030 }
00031
00032 unsigned int SMFUSenderList::numberOfFU()
00033 {
00034 boost::mutex::scoped_lock sl(list_lock_);
00035 unsigned int num_RBxOMxFU = (unsigned int)senderlist_.size();
00036 if(numberOfRB_ != 0 && numberOfOM_ != 0)
00037 return (num_RBxOMxFU/(numberOfRB_ * numberOfOM_));
00038 else
00039 return 0;
00040
00041 }
00042
00043 boost::shared_ptr<stor::SMFUSenderEntry> SMFUSenderList::findFirstEntry(const char* hltURL,
00044 const char* hltClassName,
00045 const unsigned int hltLocalId,
00046 const unsigned int hltInstance,
00047 const unsigned int hltTid)
00048 {
00049
00050 boost::shared_ptr<stor::SMFUSenderEntry> entryPtr;
00051 if(senderlist_.empty()) return entryPtr;
00052
00053 for(list<boost::shared_ptr<stor::SMFUSenderEntry> >::iterator pos = senderlist_.begin();
00054 pos != senderlist_.end(); ++pos)
00055 {
00056 if((*pos)->matchFirst(hltURL, hltClassName, hltLocalId, hltInstance, hltTid))
00057 {
00058 entryPtr = (*pos);
00059 return entryPtr;
00060 }
00061 }
00062 return entryPtr;
00063 }
00064
00065 boost::shared_ptr<stor::SMFUSenderEntry> SMFUSenderList::findEntry(const char* hltURL,
00066 const char* hltClassName,
00067 const unsigned int hltLocalId,
00068 const unsigned int hltInstance,
00069 const unsigned int hltTid,
00070 const uint32 rbBufferID,
00071 const std::string outModName)
00072 {
00073
00074 boost::shared_ptr<stor::SMFUSenderEntry> entryPtr;
00075 if(senderlist_.empty()) return entryPtr;
00076
00077 for(list<boost::shared_ptr<stor::SMFUSenderEntry> >::iterator pos = senderlist_.begin();
00078 pos != senderlist_.end(); ++pos)
00079 {
00080 if((*pos)->match(hltURL, hltClassName, hltLocalId, hltInstance, hltTid, rbBufferID, outModName))
00081 {
00082 entryPtr = (*pos);
00083 return entryPtr;
00084 }
00085 }
00086 return entryPtr;
00087 }
00088
00089 boost::shared_ptr<stor::SMFUSenderEntry> SMFUSenderList::findFirstEntry(const char* hltURL,
00090 const char* hltClassName,
00091 const unsigned int hltLocalId,
00092 const unsigned int hltInstance,
00093 const unsigned int hltTid,
00094 const std::string outModName)
00095 {
00096
00097 boost::shared_ptr<stor::SMFUSenderEntry> entryPtr;
00098 if(senderlist_.empty()) return entryPtr;
00099
00100 for(list<boost::shared_ptr<stor::SMFUSenderEntry> >::iterator pos = senderlist_.begin();
00101 pos != senderlist_.end(); ++pos)
00102 {
00103 if((*pos)->matchFirst(hltURL, hltClassName, hltLocalId, hltInstance, hltTid, outModName))
00104 {
00105 entryPtr = (*pos);
00106 return entryPtr;
00107 }
00108 }
00109 return entryPtr;
00110 }
00111
00112 boost::shared_ptr<stor::SMFUSenderEntry> SMFUSenderList::findFirstEntry(const char* hltURL,
00113 const char* hltClassName,
00114 const unsigned int hltLocalId,
00115 const unsigned int hltInstance,
00116 const unsigned int hltTid,
00117 const uint32 outModId)
00118 {
00119
00120 boost::shared_ptr<stor::SMFUSenderEntry> entryPtr;
00121 if(senderlist_.empty()) return entryPtr;
00122
00123 for(list<boost::shared_ptr<stor::SMFUSenderEntry> >::iterator pos = senderlist_.begin();
00124 pos != senderlist_.end(); ++pos)
00125 {
00126 if((*pos)->matchFirst(hltURL, hltClassName, hltLocalId, hltInstance, hltTid, outModId))
00127 {
00128 entryPtr = (*pos);
00129 return entryPtr;
00130 }
00131 }
00132 return entryPtr;
00133 }
00134
00135 boost::shared_ptr<stor::SMFUSenderEntry> SMFUSenderList::addEntry(const char* hltURL,
00136 const char* hltClassName, const unsigned int hltLocalId,
00137 const unsigned int hltInstance, const unsigned int hltTid,
00138 const unsigned int frameCount, const unsigned int numFrames,
00139 const uint32 regSize, const std::string outModName,
00140 const uint32 outModId, const uint32 rbBufferID)
00141 {
00142 boost::shared_ptr<stor::SMFUSenderEntry> entry_p(new SMFUSenderEntry(hltURL, hltClassName,
00143 hltLocalId, hltInstance, hltTid, frameCount, numFrames,
00144 outModName, outModId, rbBufferID, regSize));
00145 senderlist_.push_back(entry_p);
00146 return entry_p;
00147 }
00148
00149
00150
00151
00152
00153
00154
00155
00156
00157
00158
00159
00160
00161
00162
00163
00164
00165
00166
00167
00168
00169 int SMFUSenderList::registerDataSender(const char* hltURL,
00170 const char* hltClassName, const unsigned int hltLocalId,
00171 const unsigned int hltInstance, const unsigned int hltTid,
00172 const unsigned int frameCount, const unsigned int numFrames,
00173 const uint32 regSize, const std::string outModName,
00174 const uint32 outModId, const uint32 rbBufferID)
00175 {
00176
00177
00178 boost::mutex::scoped_lock sl(list_lock_);
00179 boost::shared_ptr<stor::SMFUSenderEntry> foundPos = findEntry(hltURL, hltClassName, hltLocalId,
00180 hltInstance, hltTid, rbBufferID, outModName);
00181 if(foundPos != NULL)
00182 {
00183
00184 bool sameOutMod = foundPos->sameOutMod(outModName);
00185 if(!sameOutMod)
00186 {
00187 FDEBUG(10) << "registerDataSender: found a new output Module " << outModName << " for URL "
00188 << hltURL << " and Tid " << hltTid << " rb buffer ID " << rbBufferID << std::endl;
00189 foundPos->addReg2Entry(frameCount, numFrames, outModName, outModId, regSize);
00190 if(foundPos->regIsCopied(outModName)) {
00191 return 1;
00192 } else {
00193 return 0;
00194 }
00195 } else {
00196 FDEBUG(10) << "registerDataSender: found another frame " << frameCount << " for URL "
00197 << hltURL << " Tid " << hltTid << " rb buffer ID " << rbBufferID
00198 << " and output module " << outModName << std::endl;
00199
00200
00201 bool regComplete = foundPos->addFrame(frameCount, numFrames, regSize, outModName);
00202 if(regComplete) {
00203 return 1;
00204 } else {
00205 return 0;
00206 }
00207 }
00208 } else {
00209 FDEBUG(9) << "registerDataSender: found a different FU Sender with frame "
00210 << frameCount << " for URL "
00211 << hltURL << " and Tid " << hltTid << " rb buffer ID " << rbBufferID << std::endl;
00212
00213 boost::shared_ptr<stor::SMFUSenderEntry> tempPos = findFirstEntry(hltURL, hltClassName, hltLocalId,
00214 hltInstance, hltTid);
00215 if(tempPos == NULL) ++numberOfRB_;
00216 tempPos = findFirstEntry(hltURL, hltClassName, hltLocalId, hltInstance, hltTid, outModName);
00217 if(tempPos == NULL) ++numberOfOM_;
00218
00219
00220 foundPos = addEntry(hltURL, hltClassName, hltLocalId, hltInstance, hltTid,
00221 frameCount, numFrames, regSize, outModName, outModId, rbBufferID);
00222
00223 if(foundPos == NULL)
00224 {
00225 FDEBUG(9) << "registerDataSender: registering new FU sender at " << hltURL
00226 << " failed! List size is " << senderlist_.size() << std::endl;
00227 return -1;
00228 } else {
00229 if(foundPos->regIsCopied(outModName)) {
00230 return 1;
00231 } else {
00232 return 0;
00233 }
00234 }
00235 }
00236 }
00237
00238 int SMFUSenderList::updateSender4data(const char* hltURL,
00239 const char* hltClassName,
00240 const unsigned int hltLocalId,
00241 const unsigned int hltInstance,
00242 const unsigned int hltTid,
00243 const unsigned int runNumber,
00244 const unsigned int eventNumber,
00245 const unsigned int frameNum,
00246 const unsigned int totalFrames,
00247 const unsigned int origdatasize,
00248 const bool isLocal,
00249 const uint32 outModId)
00250 {
00251
00252 boost::mutex::scoped_lock sl(list_lock_);
00253 boost::shared_ptr<stor::SMFUSenderEntry> foundPos = findFirstEntry(hltURL, hltClassName, hltLocalId,
00254 hltInstance, hltTid, outModId);
00255 if(foundPos != NULL)
00256 {
00257
00258 if(!foundPos->getDataStatus())
00259 {
00260 foundPos->setDataStatus();
00261 FDEBUG(9) << "updateSender4data: received first data frame for URL"
00262 << hltURL << " and Tid " << hltTid << std::endl;
00263 foundPos->setrunNumber(runNumber);
00264 foundPos->setisLocal(isLocal);
00265 }
00266
00267 if(foundPos->sameOutMod(outModId)) {
00268 bool gotEvt = foundPos->update4Data(runNumber, eventNumber, frameNum,
00269 totalFrames, origdatasize, outModId);
00270 if(gotEvt) {
00271 return 1;
00272 } else {
00273 return 0;
00274 }
00275 } else {
00276
00277 FDEBUG(9) << "updateSender4data: Cannot find output module Id "
00278 << outModId << " in FU Sender Entry!"
00279 << " With URL "
00280 << hltURL << " class " << hltClassName << " instance "
00281 << hltInstance << " Tid " << hltTid << std::endl;
00282 return -1;
00283 }
00284 } else {
00285
00286 FDEBUG(9) << "updateSender4data: Cannot find FU in FU Sender list!"
00287 << " With URL "
00288 << hltURL << " class " << hltClassName << " instance "
00289 << hltInstance << " Tid " << hltTid << std::endl;
00290 return -1;
00291 }
00292 }
00293
00294
00295
00296
00297
00298
00299
00300
00301
00302
00303
00304
00305
00306 unsigned int SMFUSenderList::getRegistrySize(const char* hltURL,
00307 const char* hltClassName, const unsigned int hltLocalId,
00308 const unsigned int hltInstance, const unsigned int hltTid,
00309 const std::string outModName, const uint32 rbBufferID)
00310 {
00311 boost::mutex::scoped_lock sl(list_lock_);
00312 boost::shared_ptr<stor::SMFUSenderEntry> foundPos = findEntry(hltURL, hltClassName, hltLocalId,
00313 hltInstance, hltTid, rbBufferID, outModName);
00314 if(foundPos != NULL)
00315 {
00316 return foundPos->getregistrySize(outModName);
00317 } else {
00318 return 0;
00319 }
00320 }
00321
00322 std::vector<boost::shared_ptr<SMFUSenderStats> > SMFUSenderList::getSenderStats()
00323 {
00324 boost::mutex::scoped_lock sl(list_lock_);
00325 std::vector<boost::shared_ptr<SMFUSenderStats> > vstat;
00326 if(senderlist_.size() == 0) return vstat;
00327
00328 for(std::list<boost::shared_ptr<stor::SMFUSenderEntry> >::iterator pos = senderlist_.begin();
00329 pos != senderlist_.end(); ++pos)
00330 {
00331 boost::shared_ptr<SMFUSenderStats> fustat(new SMFUSenderStats((*pos)->getvhltURL(),
00332 (*pos)->getvhltClassName(),
00333 (*pos)->gethltLocalId(),
00334 (*pos)->gethltInstance(),
00335 (*pos)->gethltTid(),
00336 (*pos)->getrbBufferID(),
00337 (*pos)->getRegistryCollection(),
00338 (*pos)->getDatCollection(),
00339 (*pos)->getconnectStatus(),
00340 (*pos)->getlastLatency(),
00341 (*pos)->getrunNumber(),
00342 (*pos)->getisLocal(),
00343 (*pos)->getAllframesReceived(),
00344 (*pos)->getAlleventsReceived(),
00345 (*pos)->getlastEventID(),
00346 (*pos)->getlastRunID(),
00347 (*pos)->gettotalOutOfOrder(),
00348 (*pos)->getAlltotalSizeReceived(),
00349 (*pos)->gettotalBadEvents(),
00350 (*pos)->getStopWTime()));
00351 vstat.push_back(fustat);
00352 }
00353 return vstat;
00354 }