CMS 3D CMS Logo

/afs/cern.ch/work/a/aaltunda/public/www/CMSSW_6_2_7/src/EventFilter/StorageManager/src/DiscardManager.cc

Go to the documentation of this file.
00001 // $Id: DiscardManager.cc,v 1.7 2011/04/07 08:01:40 mommsen Exp $
00003 
00004 #include "EventFilter/StorageManager/interface/DataSenderMonitorCollection.h"
00005 #include "EventFilter/StorageManager/interface/DiscardManager.h"
00006 #include "EventFilter/StorageManager/interface/Exception.h"
00007 #include "EventFilter/StorageManager/interface/FUProxy.h"
00008 #include "EventFilter/StorageManager/interface/I2OChain.h"
00009 
00010 #include "IOPool/Streamer/interface/MsgHeader.h"
00011 
00012 #include "toolbox/mem/HeapAllocator.h"
00013 #include "toolbox/mem/MemoryPoolFactory.h"
00014 #include "toolbox/net/URN.h"
00015 
00016 
00017 namespace stor {
00018   
00019   DiscardManager::DiscardManager
00020   (
00021     xdaq::ApplicationContext* ctx,
00022     xdaq::ApplicationDescriptor* desc,
00023     DataSenderMonitorCollection& dsmc
00024   ):
00025   appContext_(ctx),
00026   appDescriptor_(desc),
00027   dataSenderMonCollection_(dsmc)
00028   {
00029     std::ostringstream poolName;
00030     poolName << desc->getClassName() << desc->getInstance();
00031     toolbox::net::URN urn("toolbox-mem-pool", poolName.str());
00032     toolbox::mem::HeapAllocator* a = new toolbox::mem::HeapAllocator();
00033     
00034     msgPool_ = toolbox::mem::getMemoryPoolFactory()->createPool(urn, a);
00035   }
00036   
00037   void DiscardManager::configure()
00038   {
00039     proxyCache_.clear();
00040   }
00041   
00042   bool DiscardManager::sendDiscardMessage(I2OChain const& i2oMessage)
00043   {
00044     if (i2oMessage.messageCode() == Header::INVALID)
00045     {
00046       dataSenderMonCollection_.incrementSkippedDiscardCount(i2oMessage);
00047       return false;
00048     }
00049     
00050     unsigned int rbBufferId = i2oMessage.rbBufferId();
00051     std::string hltClassName = i2oMessage.hltClassName();
00052     unsigned int hltInstance = i2oMessage.hltInstance();
00053     FUProxyPtr fuProxyPtr = getProxyFromCache(hltClassName, hltInstance);
00054     if (fuProxyPtr.get() == 0)
00055     {
00056       dataSenderMonCollection_.incrementSkippedDiscardCount(i2oMessage);
00057       std::stringstream msg;
00058       msg << "Unable to find the resource broker corresponding to ";
00059       msg << "classname = \"";
00060       msg << hltClassName;
00061       msg << "\" and instance = \"";
00062       msg << hltInstance;
00063       msg << "\".";
00064       XCEPT_RAISE(exception::RBLookupFailed, msg.str());
00065     }
00066     else
00067     {
00068       if (i2oMessage.messageCode() == Header::DQM_EVENT)
00069       {
00070         fuProxyPtr->sendDQMDiscard(rbBufferId);
00071         dataSenderMonCollection_.incrementDQMDiscardCount(i2oMessage);
00072       }
00073       else
00074       {
00075         fuProxyPtr->sendDataDiscard(rbBufferId);        
00076         dataSenderMonCollection_.incrementDataDiscardCount(i2oMessage);
00077       }
00078     }
00079     
00080     return true;
00081   }
00082   
00083   DiscardManager::FUProxyPtr
00084   DiscardManager::getProxyFromCache
00085   (
00086     std::string const& hltClassName,
00087     unsigned int const& hltInstance
00088   )
00089   {
00090     HLTSenderKey mapKey = std::make_pair(hltClassName, hltInstance);
00091     FUProxyMap::iterator pos = proxyCache_.lower_bound(mapKey);
00092     
00093     if (pos == proxyCache_.end() || (proxyCache_.key_comp()(mapKey, pos->first)))
00094     {
00095       // Use pos as a hint to insert a new record, so it can avoid another lookup
00096       FUProxyPtr fuProxyPtr = makeNewFUProxy(hltClassName, hltInstance);
00097       if (fuProxyPtr.get() != 0)
00098         pos = proxyCache_.insert(pos, FUProxyMap::value_type(mapKey, fuProxyPtr));
00099 
00100       return fuProxyPtr;
00101     }
00102     else
00103     {
00104       return pos->second;
00105     }
00106   }
00107   
00108   DiscardManager::FUProxyPtr
00109   DiscardManager::makeNewFUProxy
00110   (
00111     std::string const& hltClassName,
00112     unsigned int const& hltInstance
00113   )
00114   {
00115     FUProxyPtr proxyPtr;
00116     std::set<xdaq::ApplicationDescriptor*> setOfRBs=
00117       appContext_->getDefaultZone()->
00118       getApplicationDescriptors(hltClassName.c_str());
00119     
00120     std::set<xdaq::ApplicationDescriptor*>::iterator iter;
00121     std::set<xdaq::ApplicationDescriptor*>::iterator iterEnd = setOfRBs.end();
00122     
00123     for (iter = setOfRBs.begin(); iter != iterEnd; ++iter)
00124     {
00125       if ((*iter)->getInstance() == hltInstance)
00126       {
00127         proxyPtr.reset(new FUProxy(appDescriptor_, *iter,
00128             appContext_, msgPool_));
00129         break;
00130       }
00131     }
00132     
00133     return proxyPtr;
00134   }
00135   
00136 } // namespace stor
00137 
00138