Go to the documentation of this file.00001
00003
00004 #include "EventFilter/StorageManager/interface/DataSenderMonitorCollection.h"
00005 #include "EventFilter/StorageManager/interface/DiscardManager.h"
00006 #include "EventFilter/StorageManager/interface/Exception.h"
00007 #include "EventFilter/StorageManager/interface/FUProxy.h"
00008 #include "EventFilter/StorageManager/interface/I2OChain.h"
00009
00010 #include "IOPool/Streamer/interface/MsgHeader.h"
00011
00012 #include "toolbox/mem/HeapAllocator.h"
00013 #include "toolbox/mem/MemoryPoolFactory.h"
00014 #include "toolbox/net/URN.h"
00015
00016
00017 namespace stor {
00018
00019 DiscardManager::DiscardManager
00020 (
00021 xdaq::ApplicationContext* ctx,
00022 xdaq::ApplicationDescriptor* desc,
00023 DataSenderMonitorCollection& dsmc
00024 ):
00025 appContext_(ctx),
00026 appDescriptor_(desc),
00027 dataSenderMonCollection_(dsmc)
00028 {
00029 std::ostringstream poolName;
00030 poolName << desc->getClassName() << desc->getInstance();
00031 toolbox::net::URN urn("toolbox-mem-pool", poolName.str());
00032 toolbox::mem::HeapAllocator* a = new toolbox::mem::HeapAllocator();
00033
00034 msgPool_ = toolbox::mem::getMemoryPoolFactory()->createPool(urn, a);
00035 }
00036
00037 void DiscardManager::configure()
00038 {
00039 proxyCache_.clear();
00040 }
00041
00042 bool DiscardManager::sendDiscardMessage(I2OChain const& i2oMessage)
00043 {
00044 if (i2oMessage.messageCode() == Header::INVALID)
00045 {
00046 dataSenderMonCollection_.incrementSkippedDiscardCount(i2oMessage);
00047 return false;
00048 }
00049
00050 unsigned int rbBufferId = i2oMessage.rbBufferId();
00051 std::string hltClassName = i2oMessage.hltClassName();
00052 unsigned int hltInstance = i2oMessage.hltInstance();
00053 FUProxyPtr fuProxyPtr = getProxyFromCache(hltClassName, hltInstance);
00054 if (fuProxyPtr.get() == 0)
00055 {
00056 dataSenderMonCollection_.incrementSkippedDiscardCount(i2oMessage);
00057 std::stringstream msg;
00058 msg << "Unable to find the resource broker corresponding to ";
00059 msg << "classname = \"";
00060 msg << hltClassName;
00061 msg << "\" and instance = \"";
00062 msg << hltInstance;
00063 msg << "\".";
00064 XCEPT_RAISE(exception::RBLookupFailed, msg.str());
00065 }
00066 else
00067 {
00068 if (i2oMessage.messageCode() == Header::DQM_EVENT)
00069 {
00070 fuProxyPtr->sendDQMDiscard(rbBufferId);
00071 dataSenderMonCollection_.incrementDQMDiscardCount(i2oMessage);
00072 }
00073 else
00074 {
00075 fuProxyPtr->sendDataDiscard(rbBufferId);
00076 dataSenderMonCollection_.incrementDataDiscardCount(i2oMessage);
00077 }
00078 }
00079
00080 return true;
00081 }
00082
00083 DiscardManager::FUProxyPtr
00084 DiscardManager::getProxyFromCache
00085 (
00086 std::string const& hltClassName,
00087 unsigned int const& hltInstance
00088 )
00089 {
00090 HLTSenderKey mapKey = std::make_pair(hltClassName, hltInstance);
00091 FUProxyMap::iterator pos = proxyCache_.lower_bound(mapKey);
00092
00093 if (pos == proxyCache_.end() || (proxyCache_.key_comp()(mapKey, pos->first)))
00094 {
00095
00096 FUProxyPtr fuProxyPtr = makeNewFUProxy(hltClassName, hltInstance);
00097 if (fuProxyPtr.get() != 0)
00098 pos = proxyCache_.insert(pos, FUProxyMap::value_type(mapKey, fuProxyPtr));
00099
00100 return fuProxyPtr;
00101 }
00102 else
00103 {
00104 return pos->second;
00105 }
00106 }
00107
00108 DiscardManager::FUProxyPtr
00109 DiscardManager::makeNewFUProxy
00110 (
00111 std::string const& hltClassName,
00112 unsigned int const& hltInstance
00113 )
00114 {
00115 FUProxyPtr proxyPtr;
00116 std::set<xdaq::ApplicationDescriptor*> setOfRBs=
00117 appContext_->getDefaultZone()->
00118 getApplicationDescriptors(hltClassName.c_str());
00119
00120 std::set<xdaq::ApplicationDescriptor*>::iterator iter;
00121 std::set<xdaq::ApplicationDescriptor*>::iterator iterEnd = setOfRBs.end();
00122
00123 for (iter = setOfRBs.begin(); iter != iterEnd; ++iter)
00124 {
00125 if ((*iter)->getInstance() == hltInstance)
00126 {
00127 proxyPtr.reset(new FUProxy(appDescriptor_, *iter,
00128 appContext_, msgPool_));
00129 break;
00130 }
00131 }
00132
00133 return proxyPtr;
00134 }
00135
00136 }
00137
00138