CMS 3D CMS Logo

Public Types | Public Member Functions | Private Types | Private Member Functions | Private Attributes

smproxy::DataManager Class Reference

#include <DataManager.h>

List of all members.

Public Types

typedef EventRetriever
< stor::DQMEventConsumerRegistrationInfo,
stor::DQMEventQueueCollectionPtr
DQMEventRetriever

Public Member Functions

 DataManager (StateMachine *)
bool getQueueIDsFromDataEventRetrievers (stor::EventConsRegPtr, stor::QueueIDs &) const
bool getQueueIDsFromDQMEventRetrievers (stor::DQMEventConsRegPtr, stor::QueueIDs &) const
void start (DataRetrieverParams const &)
void stop ()
 ~DataManager ()

Private Types

typedef EventRetriever
< stor::EventConsumerRegistrationInfo,
EventQueueCollectionPtr
DataEventRetriever
typedef std::map
< stor::EventConsRegPtr,
DataEventRetrieverPtr,
stor::utils::ptrComp
< stor::EventConsumerRegistrationInfo > > 
DataEventRetrieverMap
typedef boost::shared_ptr
< DataEventRetriever
DataEventRetrieverPtr
typedef std::map
< stor::DQMEventConsRegPtr,
DQMEventRetrieverPtr,
stor::utils::ptrComp
< stor::DQMEventConsumerRegistrationInfo > > 
DQMEventRetrieverMap
typedef boost::shared_ptr
< DQMEventRetriever
DQMEventRetrieverPtr

Private Member Functions

void activity ()
bool addDQMEventConsumer (stor::RegPtr)
bool addEventConsumer (stor::RegPtr)
void checkForStaleConsumers ()
void doIt ()
void watchDog ()

Private Attributes

DataEventRetrieverMap dataEventRetrievers_
DataRetrieverParams dataRetrieverParams_
DQMEventRetrieverMap dqmEventRetrievers_
stor::RegistrationQueuePtr registrationQueue_
StateMachinestateMachine_
boost::scoped_ptr< boost::thread > thread_
boost::scoped_ptr< boost::thread > watchDogThread_

Detailed Description

Manages the data retrieval

Author:
eulisse
Revision:
1.3
Date:
2013/01/10 15:55:44

Definition at line 36 of file DataManager.h.


Member Typedef Documentation

Definition at line 94 of file DataManager.h.

Definition at line 97 of file DataManager.h.

Definition at line 95 of file DataManager.h.

Definition at line 75 of file DataManager.h.

Definition at line 102 of file DataManager.h.

typedef boost::shared_ptr<DQMEventRetriever> smproxy::DataManager::DQMEventRetrieverPtr [private]

Definition at line 100 of file DataManager.h.


Constructor & Destructor Documentation

smproxy::DataManager::DataManager ( StateMachine stateMachine)

Definition at line 18 of file DataManager.cc.

References checkForStaleConsumers().

    :
  stateMachine_(stateMachine),
  registrationQueue_(stateMachine->getRegistrationQueue())
  {
    watchDogThread_.reset(
      new boost::thread( boost::bind( &DataManager::checkForStaleConsumers, this) )
    );
  }
smproxy::DataManager::~DataManager ( )

Definition at line 29 of file DataManager.cc.

References stop(), and watchDogThread_.

  {
    stop();
    watchDogThread_->interrupt();
    watchDogThread_->join();
  }

Member Function Documentation

void smproxy::DataManager::activity ( ) [private]

Definition at line 103 of file DataManager.cc.

References doIt(), alignCSCRings::e, exception, Exception, smproxy::StateMachine::moveToFailedState(), stateMachine_, and AlCaHLTBitMon_QueryRunRegistry::string.

  {
    try
    {
      doIt();
    }
    catch(xcept::Exception &e)
    {
      stateMachine_->moveToFailedState(e);
    }
    catch(std::exception &e)
    {
      XCEPT_DECLARE(exception::Exception,
        sentinelException, e.what());
      stateMachine_->moveToFailedState(sentinelException);
    }
    catch(...)
    {
      std::string errorMsg = "Unknown exception in watch dog";
      XCEPT_DECLARE(exception::Exception,
        sentinelException, errorMsg);
      stateMachine_->moveToFailedState(sentinelException);
    }
  }
bool smproxy::DataManager::addDQMEventConsumer ( stor::RegPtr  regPtr) [private]

Definition at line 179 of file DataManager.cc.

References dqmEventRetrievers_, pos, and stateMachine_.

Referenced by doIt().

  {
    stor::DQMEventConsRegPtr dqmEventConsumer =
      boost::dynamic_pointer_cast<stor::DQMEventConsumerRegistrationInfo>(regPtr);
    
    if ( ! dqmEventConsumer ) return false;

    DQMEventRetrieverMap::iterator pos =
      dqmEventRetrievers_.lower_bound(dqmEventConsumer);
    if (
      pos == dqmEventRetrievers_.end() ||
      (dqmEventRetrievers_.key_comp()(dqmEventConsumer, pos->first)) )
    {
      // no retriever found for this DQM event requests
      DQMEventRetrieverPtr dqmEventRetriever(
        new DQMEventRetriever(stateMachine_, dqmEventConsumer)
      );
      dqmEventRetrievers_.insert(pos,
        DQMEventRetrieverMap::value_type(dqmEventConsumer, dqmEventRetriever));
    }
    else
    {
      pos->second->addConsumer( dqmEventConsumer );
    }
    
    return true;
  }
bool smproxy::DataManager::addEventConsumer ( stor::RegPtr  regPtr) [private]

Definition at line 150 of file DataManager.cc.

References dataEventRetrievers_, pos, and stateMachine_.

Referenced by doIt().

  {
    stor::EventConsRegPtr eventConsumer =
      boost::dynamic_pointer_cast<stor::EventConsumerRegistrationInfo>(regPtr);
    
    if ( ! eventConsumer ) return false;

    DataEventRetrieverMap::iterator pos = dataEventRetrievers_.lower_bound(eventConsumer);
    if (
      pos == dataEventRetrievers_.end() ||
      (dataEventRetrievers_.key_comp()(eventConsumer, pos->first))
    )
    {
      // no retriever found for this event requests
      DataEventRetrieverPtr dataEventRetriever(
        new DataEventRetriever(stateMachine_, eventConsumer)
      );
      dataEventRetrievers_.insert(pos,
        DataEventRetrieverMap::value_type(eventConsumer, dataEventRetriever));
    }
    else
    {
      pos->second->addConsumer( eventConsumer );
    }

    return true;
  }
void smproxy::DataManager::checkForStaleConsumers ( ) [private]

Definition at line 238 of file DataManager.cc.

References stor::utils::getCurrentTime(), smproxy::StateMachine::getDQMEventQueueCollection(), smproxy::StateMachine::getEventQueueCollection(), cmsPerfSuiteHarvest::now, seconds(), stor::utils::sleep(), and stateMachine_.

Referenced by DataManager(), and watchDog().

  {
    EventQueueCollectionPtr eventQueueCollection =
      stateMachine_->getEventQueueCollection();
    stor::DQMEventQueueCollectionPtr dqmEventQueueCollection =
      stateMachine_->getDQMEventQueueCollection();
    
    while (true)
    {
      boost::this_thread::sleep(boost::posix_time::seconds(1));
      stor::utils::TimePoint_t now = stor::utils::getCurrentTime();
      eventQueueCollection->clearStaleQueues(now);
      dqmEventQueueCollection->clearStaleQueues(now);
    }
  }
void smproxy::DataManager::doIt ( ) [private]

Definition at line 129 of file DataManager.cc.

References addDQMEventConsumer(), addEventConsumer(), smproxy::DQMArchiver::getRegPtr(), LaserDQM_cfg::process, registrationQueue_, and stateMachine_.

Referenced by activity(), and start().

  {
    stor::RegPtr regPtr;
    bool process(true);

    DQMArchiver dqmArchiver(stateMachine_);
    addDQMEventConsumer(dqmArchiver.getRegPtr());

    do
    {
      registrationQueue_->deqWait(regPtr);

      if ( ! (addEventConsumer(regPtr) || addDQMEventConsumer(regPtr)) )
      {
        // base type received, signalling the end of the run
        process = false;
      }
    } while (process);
  }
bool smproxy::DataManager::getQueueIDsFromDataEventRetrievers ( stor::EventConsRegPtr  eventConsumer,
stor::QueueIDs queueIDs 
) const

Get list of data event consumer queueIDs for given event type. Returns false if the event type is not found.

Definition at line 70 of file DataManager.cc.

References pos.

  {
    if ( ! eventConsumer ) return false;
    
    DataEventRetrieverMap::const_iterator pos =
      dataEventRetrievers_.find(eventConsumer);
    if ( pos == dataEventRetrievers_.end() ) return false;
    
    queueIDs = pos->second->getQueueIDs();
    return true;
  }
bool smproxy::DataManager::getQueueIDsFromDQMEventRetrievers ( stor::DQMEventConsRegPtr  dqmEventConsumer,
stor::QueueIDs queueIDs 
) const

Get list of DQM event consumer queueIDs for given event type. Returns false if the event type is not found.

Definition at line 87 of file DataManager.cc.

References pos.

  {
    if ( ! dqmEventConsumer ) return false;
    
    DQMEventRetrieverMap::const_iterator pos =
      dqmEventRetrievers_.find(dqmEventConsumer);
    if ( pos == dqmEventRetrievers_.end() ) return false;
    
    queueIDs = pos->second->getQueueIDs();
    return true;
  }
void smproxy::DataManager::start ( DataRetrieverParams const &  drp)

Start retrieving data

Definition at line 37 of file DataManager.cc.

References dataEventRetrievers_, dataRetrieverParams_, doIt(), dqmEventRetrievers_, edm::shutdown_flag, and thread_.

  {
    dataRetrieverParams_ = drp;
    dataEventRetrievers_.clear();
    dqmEventRetrievers_.clear();
    edm::shutdown_flag = false;
    thread_.reset(
      new boost::thread( boost::bind( &DataManager::doIt, this) )
    );
  }
void smproxy::DataManager::stop ( )

Stop retrieving data

Definition at line 49 of file DataManager.cc.

References dataEventRetrievers_, dqmEventRetrievers_, registrationQueue_, edm::shutdown_flag, and thread_.

Referenced by ~DataManager().

  {
    // enqueue a dummy RegistrationInfoBase to tell the thread to stop
    registrationQueue_->enqWait( stor::RegPtr() );
    thread_->join();

    edm::shutdown_flag = true;

    BOOST_FOREACH(
      const DataEventRetrieverMap::value_type& pair,
      dataEventRetrievers_
    ) pair.second->stop();

    BOOST_FOREACH(
      const DQMEventRetrieverMap::value_type& pair,
      dqmEventRetrievers_
    ) pair.second->stop();
  }
void smproxy::DataManager::watchDog ( ) [private]

Definition at line 208 of file DataManager.cc.

References checkForStaleConsumers(), alignCSCRings::e, exception, Exception, smproxy::StateMachine::moveToFailedState(), stateMachine_, and AlCaHLTBitMon_QueryRunRegistry::string.

  {
    try
    {
      checkForStaleConsumers();
    }
    catch(boost::thread_interrupted)
    {
      // thread was interrupted.
    }
    catch(xcept::Exception &e)
    {
      stateMachine_->moveToFailedState(e);
    }
    catch(std::exception &e)
    {
      XCEPT_DECLARE(exception::Exception,
        sentinelException, e.what());
      stateMachine_->moveToFailedState(sentinelException);
    }
    catch(...)
    {
      std::string errorMsg = "Unknown exception in watch dog";
      XCEPT_DECLARE(exception::Exception,
        sentinelException, errorMsg);
      stateMachine_->moveToFailedState(sentinelException);
    }
  }

Member Data Documentation

Definition at line 98 of file DataManager.h.

Referenced by addEventConsumer(), start(), and stop().

Definition at line 88 of file DataManager.h.

Referenced by start().

Definition at line 103 of file DataManager.h.

Referenced by addDQMEventConsumer(), start(), and stop().

Definition at line 87 of file DataManager.h.

Referenced by doIt(), and stop().

boost::scoped_ptr<boost::thread> smproxy::DataManager::thread_ [private]

Definition at line 90 of file DataManager.h.

Referenced by start(), and stop().

boost::scoped_ptr<boost::thread> smproxy::DataManager::watchDogThread_ [private]

Definition at line 91 of file DataManager.h.

Referenced by ~DataManager().