#include <DataManager.h>
Manages the data retrieval
Definition at line 36 of file DataManager.h.
typedef EventRetriever<stor::EventConsumerRegistrationInfo, EventQueueCollectionPtr> smproxy::DataManager::DataEventRetriever [private] |
Definition at line 92 of file DataManager.h.
typedef std::map<stor::EventConsRegPtr, DataEventRetrieverPtr, stor::utils::ptrComp<stor::EventConsumerRegistrationInfo> > smproxy::DataManager::DataEventRetrieverMap [private] |
Definition at line 95 of file DataManager.h.
typedef boost::shared_ptr<DataEventRetriever> smproxy::DataManager::DataEventRetrieverPtr [private] |
Definition at line 93 of file DataManager.h.
typedef EventRetriever<stor::DQMEventConsumerRegistrationInfo, stor::DQMEventQueueCollectionPtr> smproxy::DataManager::DQMEventRetriever [private] |
Definition at line 99 of file DataManager.h.
typedef std::map<stor::DQMEventConsRegPtr, DQMEventRetrieverPtr, stor::utils::ptrComp<stor::DQMEventConsumerRegistrationInfo> > smproxy::DataManager::DQMEventRetrieverMap [private] |
Definition at line 102 of file DataManager.h.
typedef boost::shared_ptr<DQMEventRetriever> smproxy::DataManager::DQMEventRetrieverPtr [private] |
Definition at line 100 of file DataManager.h.
smproxy::DataManager::DataManager | ( | StateMachine * | stateMachine | ) |
Definition at line 18 of file DataManager.cc.
References checkForStaleConsumers().
: stateMachine_(stateMachine), registrationQueue_(stateMachine->getRegistrationQueue()) { watchDogThread_.reset( new boost::thread( boost::bind( &DataManager::checkForStaleConsumers, this) ) ); }
smproxy::DataManager::~DataManager | ( | ) |
Definition at line 29 of file DataManager.cc.
References stop(), and watchDogThread_.
{ stop(); watchDogThread_->interrupt(); watchDogThread_->join(); }
void smproxy::DataManager::activity | ( | ) | [private] |
Definition at line 103 of file DataManager.cc.
References doIt(), alignCSCRings::e, exception, Exception, smproxy::StateMachine::moveToFailedState(), and stateMachine_.
{ try { doIt(); } catch(xcept::Exception &e) { stateMachine_->moveToFailedState(e); } catch(std::exception &e) { XCEPT_DECLARE(exception::Exception, sentinelException, e.what()); stateMachine_->moveToFailedState(sentinelException); } catch(...) { std::string errorMsg = "Unknown exception in watch dog"; XCEPT_DECLARE(exception::Exception, sentinelException, errorMsg); stateMachine_->moveToFailedState(sentinelException); } }
bool smproxy::DataManager::addDQMEventConsumer | ( | stor::RegPtr | regPtr | ) | [private] |
Definition at line 179 of file DataManager.cc.
References dqmEventRetrievers_, pos, and stateMachine_.
Referenced by doIt().
{ stor::DQMEventConsRegPtr dqmEventConsumer = boost::dynamic_pointer_cast<stor::DQMEventConsumerRegistrationInfo>(regPtr); if ( ! dqmEventConsumer ) return false; DQMEventRetrieverMap::iterator pos = dqmEventRetrievers_.lower_bound(dqmEventConsumer); if ( pos == dqmEventRetrievers_.end() || (dqmEventRetrievers_.key_comp()(dqmEventConsumer, pos->first)) ) { // no retriever found for this DQM event requests DQMEventRetrieverPtr dqmEventRetriever( new DQMEventRetriever(stateMachine_, dqmEventConsumer) ); dqmEventRetrievers_.insert(pos, DQMEventRetrieverMap::value_type(dqmEventConsumer, dqmEventRetriever)); } else { pos->second->addConsumer( dqmEventConsumer ); } return true; }
bool smproxy::DataManager::addEventConsumer | ( | stor::RegPtr | regPtr | ) | [private] |
Definition at line 150 of file DataManager.cc.
References dataEventRetrievers_, pos, and stateMachine_.
Referenced by doIt().
{ stor::EventConsRegPtr eventConsumer = boost::dynamic_pointer_cast<stor::EventConsumerRegistrationInfo>(regPtr); if ( ! eventConsumer ) return false; DataEventRetrieverMap::iterator pos = dataEventRetrievers_.lower_bound(eventConsumer); if ( pos == dataEventRetrievers_.end() || (dataEventRetrievers_.key_comp()(eventConsumer, pos->first)) ) { // no retriever found for this event requests DataEventRetrieverPtr dataEventRetriever( new DataEventRetriever(stateMachine_, eventConsumer) ); dataEventRetrievers_.insert(pos, DataEventRetrieverMap::value_type(eventConsumer, dataEventRetriever)); } else { pos->second->addConsumer( eventConsumer ); } return true; }
void smproxy::DataManager::checkForStaleConsumers | ( | ) | [private] |
Definition at line 238 of file DataManager.cc.
References stor::utils::getCurrentTime(), smproxy::StateMachine::getDQMEventQueueCollection(), smproxy::StateMachine::getEventQueueCollection(), cmsPerfSuiteHarvest::now, seconds(), stor::utils::sleep(), and stateMachine_.
Referenced by DataManager(), and watchDog().
{ EventQueueCollectionPtr eventQueueCollection = stateMachine_->getEventQueueCollection(); stor::DQMEventQueueCollectionPtr dqmEventQueueCollection = stateMachine_->getDQMEventQueueCollection(); while (true) { boost::this_thread::sleep(boost::posix_time::seconds(1)); stor::utils::TimePoint_t now = stor::utils::getCurrentTime(); eventQueueCollection->clearStaleQueues(now); dqmEventQueueCollection->clearStaleQueues(now); } }
void smproxy::DataManager::doIt | ( | ) | [private] |
Definition at line 129 of file DataManager.cc.
References addDQMEventConsumer(), addEventConsumer(), smproxy::DQMArchiver::getRegPtr(), LaserDQM_cfg::process, registrationQueue_, and stateMachine_.
Referenced by activity(), and start().
{ stor::RegPtr regPtr; bool process(true); DQMArchiver dqmArchiver(stateMachine_); addDQMEventConsumer(dqmArchiver.getRegPtr()); do { registrationQueue_->deqWait(regPtr); if ( ! (addEventConsumer(regPtr) || addDQMEventConsumer(regPtr)) ) { // base type received, signalling the end of the run process = false; } } while (process); }
bool smproxy::DataManager::getQueueIDsFromDataEventRetrievers | ( | stor::EventConsRegPtr | eventConsumer, |
stor::QueueIDs & | queueIDs | ||
) | const |
Get list of data event consumer queueIDs for given event type. Returns false if the event type is not found.
Definition at line 70 of file DataManager.cc.
References pos.
{ if ( ! eventConsumer ) return false; DataEventRetrieverMap::const_iterator pos = dataEventRetrievers_.find(eventConsumer); if ( pos == dataEventRetrievers_.end() ) return false; queueIDs = pos->second->getQueueIDs(); return true; }
bool smproxy::DataManager::getQueueIDsFromDQMEventRetrievers | ( | stor::DQMEventConsRegPtr | dqmEventConsumer, |
stor::QueueIDs & | queueIDs | ||
) | const |
Get list of DQM event consumer queueIDs for given event type. Returns false if the event type is not found.
Definition at line 87 of file DataManager.cc.
References pos.
{ if ( ! dqmEventConsumer ) return false; DQMEventRetrieverMap::const_iterator pos = dqmEventRetrievers_.find(dqmEventConsumer); if ( pos == dqmEventRetrievers_.end() ) return false; queueIDs = pos->second->getQueueIDs(); return true; }
void smproxy::DataManager::start | ( | DataRetrieverParams const & | drp | ) |
Start retrieving data
Definition at line 37 of file DataManager.cc.
References dataEventRetrievers_, dataRetrieverParams_, doIt(), dqmEventRetrievers_, edm::shutdown_flag, and thread_.
{ dataRetrieverParams_ = drp; dataEventRetrievers_.clear(); dqmEventRetrievers_.clear(); edm::shutdown_flag = false; thread_.reset( new boost::thread( boost::bind( &DataManager::doIt, this) ) ); }
void smproxy::DataManager::stop | ( | ) |
Stop retrieving data
Definition at line 49 of file DataManager.cc.
References dataEventRetrievers_, dqmEventRetrievers_, registrationQueue_, edm::shutdown_flag, and thread_.
Referenced by ~DataManager().
{ // enqueue a dummy RegistrationInfoBase to tell the thread to stop registrationQueue_->enqWait( stor::RegPtr() ); thread_->join(); edm::shutdown_flag = true; BOOST_FOREACH( const DataEventRetrieverMap::value_type& pair, dataEventRetrievers_ ) pair.second->stop(); BOOST_FOREACH( const DQMEventRetrieverMap::value_type& pair, dqmEventRetrievers_ ) pair.second->stop(); }
void smproxy::DataManager::watchDog | ( | ) | [private] |
Definition at line 208 of file DataManager.cc.
References checkForStaleConsumers(), alignCSCRings::e, exception, Exception, smproxy::StateMachine::moveToFailedState(), and stateMachine_.
{ try { checkForStaleConsumers(); } catch(boost::thread_interrupted) { // thread was interrupted. } catch(xcept::Exception &e) { stateMachine_->moveToFailedState(e); } catch(std::exception &e) { XCEPT_DECLARE(exception::Exception, sentinelException, e.what()); stateMachine_->moveToFailedState(sentinelException); } catch(...) { std::string errorMsg = "Unknown exception in watch dog"; XCEPT_DECLARE(exception::Exception, sentinelException, errorMsg); stateMachine_->moveToFailedState(sentinelException); } }
Definition at line 96 of file DataManager.h.
Referenced by addEventConsumer(), start(), and stop().
Definition at line 86 of file DataManager.h.
Referenced by start().
Definition at line 103 of file DataManager.h.
Referenced by addDQMEventConsumer(), start(), and stop().
Definition at line 85 of file DataManager.h.
StateMachine* smproxy::DataManager::stateMachine_ [private] |
Definition at line 84 of file DataManager.h.
Referenced by activity(), addDQMEventConsumer(), addEventConsumer(), checkForStaleConsumers(), doIt(), and watchDog().
boost::scoped_ptr<boost::thread> smproxy::DataManager::thread_ [private] |
Definition at line 88 of file DataManager.h.
boost::scoped_ptr<boost::thread> smproxy::DataManager::watchDogThread_ [private] |
Definition at line 89 of file DataManager.h.
Referenced by ~DataManager().