#include <DiscardManager.h>
Public Types | |
typedef std::map< HLTSenderKey, boost::shared_ptr< FUProxy > > | FUProxyMap |
typedef boost::shared_ptr < FUProxy > | FUProxyPtr |
typedef std::pair< std::string, unsigned int > | HLTSenderKey |
Public Member Functions | |
void | configure () |
DiscardManager (xdaq::ApplicationContext *, xdaq::ApplicationDescriptor *, DataSenderMonitorCollection &) | |
bool | sendDiscardMessage (I2OChain const &) |
Private Member Functions | |
FUProxyPtr | getProxyFromCache (std::string const &hltClassName, unsigned int const &hltInstance) |
FUProxyPtr | makeNewFUProxy (std::string const &hltClassName, unsigned int const &hltInstance) |
Private Attributes | |
xdaq::ApplicationContext * | appContext_ |
xdaq::ApplicationDescriptor * | appDescriptor_ |
DataSenderMonitorCollection & | dataSenderMonCollection_ |
toolbox::mem::Pool * | msgPool_ |
FUProxyMap | proxyCache_ |
Handles the discard messages sent to the upstream Resource Brokers.
Definition at line 33 of file DiscardManager.h.
typedef std::map< HLTSenderKey, boost::shared_ptr<FUProxy> > stor::DiscardManager::FUProxyMap |
Definition at line 39 of file DiscardManager.h.
typedef boost::shared_ptr<FUProxy> stor::DiscardManager::FUProxyPtr |
Definition at line 38 of file DiscardManager.h.
typedef std::pair<std::string, unsigned int> stor::DiscardManager::HLTSenderKey |
Definition at line 37 of file DiscardManager.h.
stor::DiscardManager::DiscardManager | ( | xdaq::ApplicationContext * | ctx, |
xdaq::ApplicationDescriptor * | desc, | ||
DataSenderMonitorCollection & | dsmc | ||
) |
Creates a DiscardManager that will send discard messages to upstream Resource Brokers on behalf of the application specified in the application descriptor. The DiscardManager will use the specified application context to send the messages.
Definition at line 20 of file DiscardManager.cc.
References a.
: appContext_(ctx), appDescriptor_(desc), dataSenderMonCollection_(dsmc) { std::ostringstream poolName; poolName << desc->getClassName() << desc->getInstance(); toolbox::net::URN urn("toolbox-mem-pool", poolName.str()); toolbox::mem::HeapAllocator* a = new toolbox::mem::HeapAllocator(); msgPool_ = toolbox::mem::getMemoryPoolFactory()->createPool(urn, a); }
void stor::DiscardManager::configure | ( | ) |
Configures the discard manager.
Definition at line 37 of file DiscardManager.cc.
References proxyCache_.
{ proxyCache_.clear(); }
DiscardManager::FUProxyPtr stor::DiscardManager::getProxyFromCache | ( | std::string const & | hltClassName, |
unsigned int const & | hltInstance | ||
) | [private] |
Definition at line 85 of file DiscardManager.cc.
References pos.
Referenced by sendDiscardMessage().
{ HLTSenderKey mapKey = std::make_pair(hltClassName, hltInstance); FUProxyMap::iterator pos = proxyCache_.lower_bound(mapKey); if (pos == proxyCache_.end() || (proxyCache_.key_comp()(mapKey, pos->first))) { // Use pos as a hint to insert a new record, so it can avoid another lookup FUProxyPtr fuProxyPtr = makeNewFUProxy(hltClassName, hltInstance); if (fuProxyPtr.get() != 0) pos = proxyCache_.insert(pos, FUProxyMap::value_type(mapKey, fuProxyPtr)); return fuProxyPtr; } else { return pos->second; } }
DiscardManager::FUProxyPtr stor::DiscardManager::makeNewFUProxy | ( | std::string const & | hltClassName, |
unsigned int const & | hltInstance | ||
) | [private] |
Definition at line 110 of file DiscardManager.cc.
{ FUProxyPtr proxyPtr; std::set<xdaq::ApplicationDescriptor*> setOfRBs= appContext_->getDefaultZone()-> getApplicationDescriptors(hltClassName.c_str()); std::set<xdaq::ApplicationDescriptor*>::iterator iter; std::set<xdaq::ApplicationDescriptor*>::iterator iterEnd = setOfRBs.end(); for (iter = setOfRBs.begin(); iter != iterEnd; ++iter) { if ((*iter)->getInstance() == hltInstance) { proxyPtr.reset(new FUProxy(appDescriptor_, *iter, appContext_, msgPool_)); break; } } return proxyPtr; }
bool stor::DiscardManager::sendDiscardMessage | ( | I2OChain const & | i2oMessage | ) |
Sends a message to the appropriate resource broker telling it that the SM has received and processed the specified I2O message. At that point, we expect that the resource broker will free up the buffer that contained the original event (or INIT message or DQMEvent or whatever) and do whatever other cleanup it may need to do.
There are two failure modes to this method. In the first, the I2OChain could be so badly corrupt that the target resource broker can not be determined. In that case, this method simply returns false. (Since we expect that these messages will be sent to a special error stream, we will rely on their presence in that stream to indicate that something went very wrong.) In the second failure mode, the I2OChain was parsable, but the lookup of the RB in the XDAQ network failed. In that case, this method throws an exception. (This should never happen, so we will treat it as an exceptional condition.)
a | stor::exception::RBLookupFailed exception if the appropriate resource broker can not be determined. |
Definition at line 42 of file DiscardManager.cc.
References dataSenderMonCollection_, Header::DQM_EVENT, getProxyFromCache(), stor::I2OChain::hltClassName(), stor::I2OChain::hltInstance(), stor::DataSenderMonitorCollection::incrementDataDiscardCount(), stor::DataSenderMonitorCollection::incrementDQMDiscardCount(), stor::DataSenderMonitorCollection::incrementSkippedDiscardCount(), INVALID, stor::I2OChain::messageCode(), lumiQueryAPI::msg, and stor::I2OChain::rbBufferId().
{ if (i2oMessage.messageCode() == Header::INVALID) { dataSenderMonCollection_.incrementSkippedDiscardCount(i2oMessage); return false; } unsigned int rbBufferId = i2oMessage.rbBufferId(); std::string hltClassName = i2oMessage.hltClassName(); unsigned int hltInstance = i2oMessage.hltInstance(); FUProxyPtr fuProxyPtr = getProxyFromCache(hltClassName, hltInstance); if (fuProxyPtr.get() == 0) { dataSenderMonCollection_.incrementSkippedDiscardCount(i2oMessage); std::stringstream msg; msg << "Unable to find the resource broker corresponding to "; msg << "classname = \""; msg << hltClassName; msg << "\" and instance = \""; msg << hltInstance; msg << "\"."; XCEPT_RAISE(exception::RBLookupFailed, msg.str()); } else { if (i2oMessage.messageCode() == Header::DQM_EVENT) { fuProxyPtr->sendDQMDiscard(rbBufferId); dataSenderMonCollection_.incrementDQMDiscardCount(i2oMessage); } else { fuProxyPtr->sendDataDiscard(rbBufferId); dataSenderMonCollection_.incrementDataDiscardCount(i2oMessage); } } return true; }
xdaq::ApplicationContext* stor::DiscardManager::appContext_ [private] |
Definition at line 99 of file DiscardManager.h.
xdaq::ApplicationDescriptor* stor::DiscardManager::appDescriptor_ [private] |
Definition at line 100 of file DiscardManager.h.
Definition at line 105 of file DiscardManager.h.
Referenced by sendDiscardMessage().
toolbox::mem::Pool* stor::DiscardManager::msgPool_ [private] |
Definition at line 101 of file DiscardManager.h.
FUProxyMap stor::DiscardManager::proxyCache_ [private] |
Definition at line 103 of file DiscardManager.h.
Referenced by configure().