CMS 3D CMS Logo

Public Member Functions | Private Types | Private Member Functions | Private Attributes

stor::StorageManager Class Reference

#include <StorageManager.h>

List of all members.

Public Member Functions

 StorageManager (xdaq::ApplicationStub *s)

Private Types

typedef ConsumerUtils
< Configuration,
EventQueueCollection
ConsumerUtils_t

Private Member Functions

void bindConsumerCallbacks ()
void bindI2OCallbacks ()
void bindStateMachineCallbacks ()
void bindWebInterfaceCallbacks ()
void consumerListWebPage (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception)
void consumerStatisticsPage (xgi::Input *in, xgi::Output *out) throw ( xgi::exception::Exception )
void css (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception)
void defaultWebPage (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception)
void dqmEventStatisticsWebPage (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception)
void fileStatisticsWebPage (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception)
xoap::MessageReference handleFSMSoapMessage (xoap::MessageReference) throw ( xoap::exception::Exception )
void initializeSharedResources ()
void inputWebPage (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception)
StorageManageroperator= (StorageManager const &)
void processConsumerEventRequest (xgi::Input *in, xgi::Output *out) throw ( xgi::exception::Exception )
void processConsumerHeaderRequest (xgi::Input *in, xgi::Output *out) throw ( xgi::exception::Exception )
void processConsumerRegistrationRequest (xgi::Input *in, xgi::Output *out) throw ( xgi::exception::Exception )
void processDQMConsumerEventRequest (xgi::Input *in, xgi::Output *out) throw ( xgi::exception::Exception )
void processDQMConsumerRegistrationRequest (xgi::Input *in, xgi::Output *out) throw ( xgi::exception::Exception )
void rbsenderDetailWebPage (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception)
void rbsenderWebPage (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception)
void receiveDataMessage (toolbox::mem::Reference *ref)
void receiveDQMMessage (toolbox::mem::Reference *ref)
void receiveEndOfLumiSectionMessage (toolbox::mem::Reference *ref)
void receiveErrorDataMessage (toolbox::mem::Reference *ref)
void receiveRegistryMessage (toolbox::mem::Reference *ref)
void startWorkerThreads ()
 StorageManager (StorageManager const &)
void storedDataWebPage (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception)
void throughputWebPage (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception)

Private Attributes

boost::scoped_ptr
< ConsumerUtils_t
consumerUtils_
boost::scoped_ptr< DiskWriterdiskWriter_
boost::scoped_ptr
< DQMEventProcessor
dqmEventProcessor_
boost::scoped_ptr
< FragmentProcessor
fragmentProcessor_
SharedResourcesPtr sharedResources_
boost::scoped_ptr
< SMWebPageHelper
smWebPageHelper_

Detailed Description

Main class of the StorageManager XDAQ application

Author:
mommsen
Revision:
1.61
Date:
2011/11/17 17:35:40

Definition at line 46 of file StorageManager.h.


Member Typedef Documentation

Definition at line 230 of file StorageManager.h.


Constructor & Destructor Documentation

StorageManager::StorageManager ( xdaq::ApplicationStub *  s)

Definition at line 39 of file StorageManager.cc.

References bindConsumerCallbacks(), bindI2OCallbacks(), bindStateMachineCallbacks(), bindWebInterfaceCallbacks(), alignCSCRings::e, exception, Exception, initializeSharedResources(), and startWorkerThreads().

                                                      :
  xdaq::Application(s)
{  
  LOG4CPLUS_INFO(this->getApplicationLogger(),"Making StorageManager");

  // bind all callback functions
  bindI2OCallbacks();
  bindStateMachineCallbacks();
  bindWebInterfaceCallbacks();
  bindConsumerCallbacks();

  std::string errorMsg = "Exception in StorageManager constructor: ";
  try
  {
    initializeSharedResources();
  }
  catch(std::exception &e)
  {
    errorMsg += e.what();
    LOG4CPLUS_FATAL( getApplicationLogger(), e.what() );
    XCEPT_RAISE( stor::exception::Exception, e.what() );
  }
  catch(...)
  {
    errorMsg += "unknown exception";
    LOG4CPLUS_FATAL( getApplicationLogger(), errorMsg );
    XCEPT_RAISE( stor::exception::Exception, errorMsg );
  }

  startWorkerThreads();
}
stor::StorageManager::StorageManager ( StorageManager const &  ) [private]

Member Function Documentation

void StorageManager::bindConsumerCallbacks ( ) [private]

Bind callbacks for consumers

Definition at line 138 of file StorageManager.cc.

References processConsumerEventRequest(), processConsumerHeaderRequest(), processConsumerRegistrationRequest(), processDQMConsumerEventRequest(), and processDQMConsumerRegistrationRequest().

Referenced by StorageManager().

{
  // event consumers
  xgi::bind( this, &StorageManager::processConsumerRegistrationRequest, "registerConsumer" );
  xgi::bind( this, &StorageManager::processConsumerHeaderRequest, "getregdata" );
  xgi::bind( this, &StorageManager::processConsumerEventRequest, "geteventdata" );

  // dqm event consumers
  xgi::bind(this,&StorageManager::processDQMConsumerRegistrationRequest, "registerDQMConsumer");
  xgi::bind(this,&StorageManager::processDQMConsumerEventRequest, "getDQMeventdata");
}
void StorageManager::bindI2OCallbacks ( ) [private]

Bind callbacks for I2O message

Definition at line 72 of file StorageManager.cc.

References I2O_SM_DATA, I2O_SM_DQM, I2O_SM_ERROR, I2O_SM_PREAMBLE, receiveDataMessage(), receiveDQMMessage(), receiveEndOfLumiSectionMessage(), receiveErrorDataMessage(), and receiveRegistryMessage().

Referenced by StorageManager().

{
  i2o::bind(this,
            &StorageManager::receiveRegistryMessage,
            I2O_SM_PREAMBLE,
            XDAQ_ORGANIZATION_ID);
  i2o::bind(this,
            &StorageManager::receiveDataMessage,
            I2O_SM_DATA,
            XDAQ_ORGANIZATION_ID);
  i2o::bind(this,
            &StorageManager::receiveErrorDataMessage,
            I2O_SM_ERROR,
            XDAQ_ORGANIZATION_ID);
  i2o::bind(this,
            &StorageManager::receiveDQMMessage,
            I2O_SM_DQM,
            XDAQ_ORGANIZATION_ID);
  i2o::bind(this,
            &StorageManager::receiveEndOfLumiSectionMessage,
            I2O_EVM_LUMISECTION,
            XDAQ_ORGANIZATION_ID);
}
void StorageManager::bindStateMachineCallbacks ( ) [private]

Bind callbacks for state machine SOAP messages

Definition at line 97 of file StorageManager.cc.

References handleFSMSoapMessage().

Referenced by StorageManager().

{
  xoap::bind( this,
              &StorageManager::handleFSMSoapMessage,
              "Configure",
              XDAQ_NS_URI );
  xoap::bind( this,
              &StorageManager::handleFSMSoapMessage,
              "Enable",
              XDAQ_NS_URI );
  xoap::bind( this,
              &StorageManager::handleFSMSoapMessage,
              "Stop",
              XDAQ_NS_URI );
  xoap::bind( this,
              &StorageManager::handleFSMSoapMessage,
              "Halt",
              XDAQ_NS_URI );
  xoap::bind( this,
              &StorageManager::handleFSMSoapMessage,
              "EmergencyStop",
              XDAQ_NS_URI );
}
void StorageManager::bindWebInterfaceCallbacks ( ) [private]

Bind callbacks for web interface

Definition at line 122 of file StorageManager.cc.

References consumerListWebPage(), consumerStatisticsPage(), css(), defaultWebPage(), dqmEventStatisticsWebPage(), fileStatisticsWebPage(), inputWebPage(), rbsenderDetailWebPage(), rbsenderWebPage(), storedDataWebPage(), and throughputWebPage().

Referenced by StorageManager().

{
  xgi::bind(this,&StorageManager::css,                      "styles.css");
  xgi::bind(this,&StorageManager::defaultWebPage,           "Default");
  xgi::bind(this,&StorageManager::inputWebPage,             "input");
  xgi::bind(this,&StorageManager::storedDataWebPage,        "storedData");
  xgi::bind(this,&StorageManager::rbsenderWebPage,          "rbsenderlist");
  xgi::bind(this,&StorageManager::rbsenderDetailWebPage,    "rbsenderdetail");
  xgi::bind(this,&StorageManager::fileStatisticsWebPage,    "fileStatistics");
  xgi::bind(this,&StorageManager::dqmEventStatisticsWebPage,"dqmEventStatistics");
  xgi::bind(this,&StorageManager::consumerStatisticsPage,   "consumerStatistics" );
  xgi::bind(this,&StorageManager::consumerListWebPage,      "consumerList");
  xgi::bind(this,&StorageManager::throughputWebPage,        "throughputStatistics");
}
void StorageManager::consumerListWebPage ( xgi::Input *  in,
xgi::Output *  out 
) throw (xgi::exception::Exception) [private]

Callback returning a XML list of consumer information. The current implementation just returns an empty document.

Definition at line 603 of file StorageManager.cc.

References dbtoconf::out.

Referenced by bindWebInterfaceCallbacks().

{
  out->getHTTPResponseHeader().addHeader( "Content-Type",
                                          "application/octet-stream" );
  out->getHTTPResponseHeader().addHeader( "Content-Transfer-Encoding",
                                          "binary" );
  char buff;
  out->write( &buff, 0 );
}
void StorageManager::consumerStatisticsPage ( xgi::Input *  in,
xgi::Output *  out 
) throw ( xgi::exception::Exception ) [private]

Webinterface callback creating web page showing the connected consumers

Definition at line 425 of file StorageManager.cc.

References alignCSCRings::e, exception, Exception, and dbtoconf::out.

Referenced by bindWebInterfaceCallbacks().

{

  std::string err_msg =
    "Failed to create consumer statistics page";

  try
  {
    smWebPageHelper_->consumerStatistics(out);
  }
  catch( std::exception &e )
  {
    err_msg += ": ";
    err_msg += e.what();
    LOG4CPLUS_ERROR( getApplicationLogger(), err_msg );
    XCEPT_RAISE( xgi::exception::Exception, err_msg );
  }
  catch(...)
  {
    err_msg += ": Unknown exception";
    LOG4CPLUS_ERROR( getApplicationLogger(), err_msg );
    XCEPT_RAISE( xgi::exception::Exception, err_msg );
  }

}
void StorageManager::css ( xgi::Input *  in,
xgi::Output *  out 
) throw (xgi::exception::Exception) [private]

Webinterface callback for style sheet

Definition at line 337 of file StorageManager.cc.

References recoMuon::in, and dbtoconf::out.

Referenced by bindWebInterfaceCallbacks().

{
  smWebPageHelper_->css(in,out);
}
void StorageManager::defaultWebPage ( xgi::Input *  in,
xgi::Output *  out 
) throw (xgi::exception::Exception) [private]

Webinterface callback creating default web page

Definition at line 344 of file StorageManager.cc.

References alignCSCRings::e, exception, Exception, and dbtoconf::out.

Referenced by bindWebInterfaceCallbacks().

{
  std::string errorMsg = "Failed to create the default webpage";
  
  try
  {
    smWebPageHelper_->defaultWebPage(out);
  }
  catch(std::exception &e)
  {
    errorMsg += ": ";
    errorMsg += e.what();
    
    LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
    XCEPT_RAISE(xgi::exception::Exception, errorMsg);
  }
  catch(...)
  {
    errorMsg += ": Unknown exception";
    
    LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
    XCEPT_RAISE(xgi::exception::Exception, errorMsg);
  }
}
void StorageManager::dqmEventStatisticsWebPage ( xgi::Input *  in,
xgi::Output *  out 
) throw (xgi::exception::Exception) [private]

Webinterface callback creating web page showing statistics about the processed DQM events.

Definition at line 547 of file StorageManager.cc.

References alignCSCRings::e, exception, Exception, and dbtoconf::out.

Referenced by bindWebInterfaceCallbacks().

{
  std::string errorMsg = "Failed to create the DQM event statistics webpage";

  try
  {
    smWebPageHelper_->dqmEventWebPage(out);
  }
  catch(std::exception &e)
  {
    errorMsg += ": ";
    errorMsg += e.what();
    
    LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
    XCEPT_RAISE(xgi::exception::Exception, errorMsg);
  }
  catch(...)
  {
    errorMsg += ": Unknown exception";
    
    LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
    XCEPT_RAISE(xgi::exception::Exception, errorMsg);
  }
}
void StorageManager::fileStatisticsWebPage ( xgi::Input *  in,
xgi::Output *  out 
) throw (xgi::exception::Exception) [private]

Webinterface callback creating web page showing information about recently written files

Definition at line 519 of file StorageManager.cc.

References alignCSCRings::e, exception, Exception, and dbtoconf::out.

Referenced by bindWebInterfaceCallbacks().

{
  std::string errorMsg = "Failed to create the file statistics webpage";

  try
  {
    smWebPageHelper_->filesWebPage(out);
  }
  catch(std::exception &e)
  {
    errorMsg += ": ";
    errorMsg += e.what();
    
    LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
    XCEPT_RAISE(xgi::exception::Exception, errorMsg);
  }
  catch(...)
  {
    errorMsg += ": Unknown exception";
    
    LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
    XCEPT_RAISE(xgi::exception::Exception, errorMsg);
  }

}
xoap::MessageReference StorageManager::handleFSMSoapMessage ( xoap::MessageReference  msg) throw ( xoap::exception::Exception ) [private]

Callback for SOAP message containint a state machine event, possibly including new configuration values

Definition at line 619 of file StorageManager.cc.

References edmPickEvents::command, stor::soaputils::createFsmSoapResponseMsg(), alignCSCRings::e, exception, Exception, cms::Exception::explainSelf(), stor::soaputils::extractParameters(), and lumiQueryAPI::msg.

Referenced by bindStateMachineCallbacks().

{
  std::string errorMsg;
  xoap::MessageReference returnMsg;

  try {
    errorMsg = "Failed to extract FSM event and parameters from SOAP message: ";
    std::string command = soaputils::extractParameters(msg, this);

    errorMsg = "Failed to put a '" + command + "' state machine event into command queue: ";
    if (command == "Configure")
    {
      sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::Configure() ) );
    }
    else if (command == "Enable")
    {
      if (sharedResources_->configuration_->streamConfigurationHasChanged())
      {
        sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::Reconfigure() ) );
      }
      sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::Enable() ) );
    }
    else if (command == "Stop")
    {
      sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::Stop() ) );
    }
    else if (command == "Halt")
    {
      sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::Halt() ) );
    }
    else if (command == "EmergencyStop")
    {
      sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::EmergencyStop() ) );
    }
    else
    {
      XCEPT_RAISE(stor::exception::StateMachine,
        "Received an unknown state machine event '" + command + "'.");
    }

    errorMsg = "Failed to create FSM SOAP reply message: ";
    returnMsg = soaputils::createFsmSoapResponseMsg(command,
      sharedResources_->statisticsReporter_->
      getStateMachineMonitorCollection().externallyVisibleState());
  }
  catch (cms::Exception& e) {
    errorMsg += e.explainSelf();
    XCEPT_DECLARE(xoap::exception::Exception,
      sentinelException, errorMsg);
    sharedResources_->alarmHandler_->moveToFailedState( sentinelException );
    throw sentinelException;
  }
  catch (xcept::Exception &e) {
    XCEPT_DECLARE_NESTED(xoap::exception::Exception,
      sentinelException, errorMsg, e);
    sharedResources_->alarmHandler_->moveToFailedState( sentinelException );
    throw sentinelException;
  }
  catch (std::exception& e) {
    errorMsg += e.what();
    XCEPT_DECLARE(xoap::exception::Exception,
      sentinelException, errorMsg);
    sharedResources_->alarmHandler_->moveToFailedState( sentinelException );
    throw sentinelException;
  }
  catch (...) {
    errorMsg += "Unknown exception";
    XCEPT_DECLARE(xoap::exception::Exception,
      sentinelException, errorMsg);
    sharedResources_->alarmHandler_->moveToFailedState( sentinelException );
    throw sentinelException;
  }

  return returnMsg;
}
void StorageManager::initializeSharedResources ( ) [private]

Initialize the shared resources

Definition at line 151 of file StorageManager.cc.

References stor::QueueConfigurationParams::commandQueueSize_, edm::errors::Configuration, consumerUtils_, stor::QueueConfigurationParams::dqmEventQueueMemoryLimitMB_, stor::QueueConfigurationParams::dqmEventQueueSize_, stor::QueueConfigurationParams::fragmentQueueMemoryLimitMB_, stor::QueueConfigurationParams::fragmentQueueSize_, instance, stor::QueueConfigurationParams::registrationQueueSize_, reset(), sharedResources_, smWebPageHelper_, stor::QueueConfigurationParams::streamQueueMemoryLimitMB_, and stor::QueueConfigurationParams::streamQueueSize_.

Referenced by StorageManager().

{
  sharedResources_.reset(new SharedResources());

  xdata::InfoSpace *ispace = getApplicationInfoSpace();
  unsigned long instance = getApplicationDescriptor()->getInstance();
  sharedResources_->configuration_.reset(new Configuration(ispace, instance));

  QueueConfigurationParams queueParams =
    sharedResources_->configuration_->getQueueConfigurationParams();
  sharedResources_->commandQueue_.
    reset(new CommandQueue(queueParams.commandQueueSize_));
  sharedResources_->fragmentQueue_.
    reset(new FragmentQueue(queueParams.fragmentQueueSize_, queueParams.fragmentQueueMemoryLimitMB_ * 1024*1024));
  sharedResources_->registrationQueue_.
    reset(new RegistrationQueue(queueParams.registrationQueueSize_));
  sharedResources_->streamQueue_.
    reset(new StreamQueue(queueParams.streamQueueSize_, queueParams.streamQueueMemoryLimitMB_ * 1024*1024));
  sharedResources_->dqmEventQueue_.
    reset(new DQMEventQueue(queueParams.dqmEventQueueSize_, queueParams.dqmEventQueueMemoryLimitMB_ * 1024*1024));

  sharedResources_->alarmHandler_.reset( new AlarmHandler(this, sharedResources_) );
  sharedResources_->statisticsReporter_.reset(
    new StatisticsReporter(this, sharedResources_)
  );
  sharedResources_->initMsgCollection_.reset(new InitMsgCollection());
  sharedResources_->diskWriterResources_.reset(new DiskWriterResources());
  sharedResources_->dqmEventProcessorResources_.reset(new DQMEventProcessorResources());

  sharedResources_->statisticsReporter_->getThroughputMonitorCollection().setFragmentQueue(sharedResources_->fragmentQueue_);
  sharedResources_->statisticsReporter_->getThroughputMonitorCollection().setStreamQueue(sharedResources_->streamQueue_);
  sharedResources_->statisticsReporter_->getThroughputMonitorCollection().setDQMEventQueue(sharedResources_->dqmEventQueue_);

  sharedResources_->
    discardManager_.reset(new DiscardManager(getApplicationContext(),
                                             getApplicationDescriptor(),
                                             sharedResources_->statisticsReporter_->
                                             getDataSenderMonitorCollection()));

  sharedResources_->registrationCollection_.reset( new RegistrationCollection() );
  EventConsumerMonitorCollection& ecmc = 
    sharedResources_->statisticsReporter_->getEventConsumerMonitorCollection();
  sharedResources_->eventQueueCollection_.reset( new EventQueueCollection( ecmc ) );

  DQMConsumerMonitorCollection& dcmc = 
    sharedResources_->statisticsReporter_->getDQMConsumerMonitorCollection();
  sharedResources_->dqmEventQueueCollection_.reset( new DQMEventQueueCollection( dcmc ) );

  consumerUtils_.reset( new ConsumerUtils_t(
      sharedResources_->configuration_,
      sharedResources_->registrationCollection_,
      sharedResources_->registrationQueue_,
      sharedResources_->initMsgCollection_,
      sharedResources_->eventQueueCollection_,
      sharedResources_->dqmEventQueueCollection_,
      sharedResources_->alarmHandler_
    ) );

  smWebPageHelper_.reset( new SMWebPageHelper(
      getApplicationDescriptor(), sharedResources_));

}
void StorageManager::inputWebPage ( xgi::Input *  in,
xgi::Output *  out 
) throw (xgi::exception::Exception) [private]

Webinterface callback creating web page showing the I2O input information

Definition at line 371 of file StorageManager.cc.

References alignCSCRings::e, exception, Exception, and dbtoconf::out.

Referenced by bindWebInterfaceCallbacks().

{
  std::string errorMsg = "Failed to create the I2O input webpage";

  try
  {
    smWebPageHelper_->inputWebPage(out);
  }
  catch(std::exception &e)
  {
    errorMsg += ": ";
    errorMsg += e.what();
    
    LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
    XCEPT_RAISE(xgi::exception::Exception, errorMsg);
  }
  catch(...)
  {
    errorMsg += ": Unknown exception";
    
    LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
    XCEPT_RAISE(xgi::exception::Exception, errorMsg);
  }
}
StorageManager& stor::StorageManager::operator= ( StorageManager const &  ) [private]
void StorageManager::processConsumerEventRequest ( xgi::Input *  in,
xgi::Output *  out 
) throw ( xgi::exception::Exception ) [private]

Callback handling event consumer event request

Definition at line 718 of file StorageManager.cc.

References recoMuon::in, and dbtoconf::out.

Referenced by bindConsumerCallbacks().

{
  consumerUtils_->processConsumerEventRequest(in,out);
}
void StorageManager::processConsumerHeaderRequest ( xgi::Input *  in,
xgi::Output *  out 
) throw ( xgi::exception::Exception ) [private]

Callback handling event consumer init message request

Definition at line 710 of file StorageManager.cc.

References recoMuon::in, and dbtoconf::out.

Referenced by bindConsumerCallbacks().

{
  consumerUtils_->processConsumerHeaderRequest(in,out);
}
void StorageManager::processConsumerRegistrationRequest ( xgi::Input *  in,
xgi::Output *  out 
) throw ( xgi::exception::Exception ) [private]

Callback handling event consumer registration request

Definition at line 702 of file StorageManager.cc.

References recoMuon::in, and dbtoconf::out.

Referenced by bindConsumerCallbacks().

{
  consumerUtils_->processConsumerRegistrationRequest(in,out);
}
void StorageManager::processDQMConsumerEventRequest ( xgi::Input *  in,
xgi::Output *  out 
) throw ( xgi::exception::Exception ) [private]

Callback handling DQM event consumer DQM event request

Definition at line 734 of file StorageManager.cc.

References recoMuon::in, and dbtoconf::out.

Referenced by bindConsumerCallbacks().

{
  consumerUtils_->processDQMConsumerEventRequest(in,out);
}
void StorageManager::processDQMConsumerRegistrationRequest ( xgi::Input *  in,
xgi::Output *  out 
) throw ( xgi::exception::Exception ) [private]

Callback handling DQM event consumer registration request

Definition at line 726 of file StorageManager.cc.

References recoMuon::in, and dbtoconf::out.

Referenced by bindConsumerCallbacks().

{
  consumerUtils_->processDQMConsumerRegistrationRequest(in,out);
}
void StorageManager::rbsenderDetailWebPage ( xgi::Input *  in,
xgi::Output *  out 
) throw (xgi::exception::Exception) [private]

Webinterface callback creating web page showing detailed information about the resource broker sending data.

Definition at line 482 of file StorageManager.cc.

References alignCSCRings::e, exception, Exception, recoMuon::in, and dbtoconf::out.

Referenced by bindWebInterfaceCallbacks().

{
  std::string errorMsg = "Failed to create the data sender webpage";

  try
  {
    long long localRBID = 0;
    cgicc::Cgicc cgiWrapper(in);
    cgicc::const_form_iterator updateRef = cgiWrapper.getElement("id");
    if (updateRef != cgiWrapper.getElements().end())
    {
      std::string idString = updateRef->getValue();
      localRBID = boost::lexical_cast<long long>(idString);
    }

    smWebPageHelper_->resourceBrokerDetail(out, localRBID);
  }
  catch(std::exception &e)
  {
    errorMsg += ": ";
    errorMsg += e.what();
    
    LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
    XCEPT_RAISE(xgi::exception::Exception, errorMsg);
  }
  catch(...)
  {
    errorMsg += ": Unknown exception";
    
    LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
    XCEPT_RAISE(xgi::exception::Exception, errorMsg);
  }

}
void StorageManager::rbsenderWebPage ( xgi::Input *  in,
xgi::Output *  out 
) throw (xgi::exception::Exception) [private]

Webinterface callback creating web page showing summary information about the resource broker sending data.

Definition at line 454 of file StorageManager.cc.

References alignCSCRings::e, exception, Exception, and dbtoconf::out.

Referenced by bindWebInterfaceCallbacks().

{
  std::string errorMsg = "Failed to create the data sender webpage";

  try
  {
    smWebPageHelper_->resourceBrokerOverview(out);
  }
  catch(std::exception &e)
  {
    errorMsg += ": ";
    errorMsg += e.what();
    
    LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
    XCEPT_RAISE(xgi::exception::Exception, errorMsg);
  }
  catch(...)
  {
    errorMsg += ": Unknown exception";
    
    LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
    XCEPT_RAISE(xgi::exception::Exception, errorMsg);
  }

}
void StorageManager::receiveDataMessage ( toolbox::mem::Reference *  ref) [private]

Callback for I2O message containing an event

Definition at line 272 of file StorageManager.cc.

References stor::FragmentMonitorCollection::addEventFragmentSample(), alignCSCRings::r, sharedResources_, and stor::I2OChain::totalDataSize().

Referenced by bindI2OCallbacks().

{
  I2OChain i2oChain(ref);

  FragmentMonitorCollection& fragMonCollection =
    sharedResources_->statisticsReporter_->getFragmentMonitorCollection();
  fragMonCollection.addEventFragmentSample( i2oChain.totalDataSize() );

  sharedResources_->fragmentQueue_->enqWait(i2oChain);

#ifdef STOR_DEBUG_DUPLICATE_MESSAGES
  double r = rand()/static_cast<double>(RAND_MAX);
  if (r < 0.001)
  {
    LOG4CPLUS_INFO(this->getApplicationLogger(), "Simulating duplicated data message");
    receiveDataMessage(ref->duplicate());
  }
#endif
}
void StorageManager::receiveDQMMessage ( toolbox::mem::Reference *  ref) [private]

Callback for I2O message containing a DQM event (histogramms)

Definition at line 305 of file StorageManager.cc.

References stor::FragmentMonitorCollection::addDQMEventFragmentSample(), sharedResources_, and stor::I2OChain::totalDataSize().

Referenced by bindI2OCallbacks().

{
  I2OChain i2oChain(ref);

  FragmentMonitorCollection& fragMonCollection =
    sharedResources_->statisticsReporter_->getFragmentMonitorCollection();
  fragMonCollection.addDQMEventFragmentSample( i2oChain.totalDataSize() );

  sharedResources_->fragmentQueue_->enqWait(i2oChain);
}
void StorageManager::receiveEndOfLumiSectionMessage ( toolbox::mem::Reference *  ref) [private]

Callback for I2O message notifying the end-of-lumi-section

Definition at line 317 of file StorageManager.cc.

References stor::FragmentMonitorCollection::addFragmentSample(), stor::MonitoredQuantity::addSample(), stor::RunMonitorCollection::getEoLSSeenMQ(), stor::I2OChain::lumiSection(), sharedResources_, and stor::I2OChain::totalDataSize().

Referenced by bindI2OCallbacks().

{
  I2OChain i2oChain( ref );

  FragmentMonitorCollection& fragMonCollection =
    sharedResources_->statisticsReporter_->getFragmentMonitorCollection();
  fragMonCollection.addFragmentSample( i2oChain.totalDataSize() );

  RunMonitorCollection& runMonCollection =
    sharedResources_->statisticsReporter_->getRunMonitorCollection();
  runMonCollection.getEoLSSeenMQ().addSample( i2oChain.lumiSection() );

  sharedResources_->streamQueue_->enqWait( i2oChain );
}
void StorageManager::receiveErrorDataMessage ( toolbox::mem::Reference *  ref) [private]

Callback for I2O message containing an error event

Definition at line 293 of file StorageManager.cc.

References stor::FragmentMonitorCollection::addEventFragmentSample(), sharedResources_, and stor::I2OChain::totalDataSize().

Referenced by bindI2OCallbacks().

{
  I2OChain i2oChain(ref);

  FragmentMonitorCollection& fragMonCollection =
    sharedResources_->statisticsReporter_->getFragmentMonitorCollection();
  fragMonCollection.addEventFragmentSample( i2oChain.totalDataSize() );

  sharedResources_->fragmentQueue_->enqWait(i2oChain);
}
void StorageManager::receiveRegistryMessage ( toolbox::mem::Reference *  ref) [private]

Callback for I2O message containing an init message

Definition at line 253 of file StorageManager.cc.

References stor::MonitoredQuantity::addSample(), stor::FragmentMonitorCollection::getAllFragmentSizeMQ(), stor::ThroughputMonitorCollection::setMemoryPoolPointer(), sharedResources_, and stor::I2OChain::totalDataSize().

Referenced by bindI2OCallbacks().

{
  I2OChain i2oChain(ref);

  // Set the I2O message pool pointer. Only done for init messages.
  ThroughputMonitorCollection& throughputMonCollection =
    sharedResources_->statisticsReporter_->getThroughputMonitorCollection();
  throughputMonCollection.setMemoryPoolPointer( ref->getBuffer()->getPool() );

  FragmentMonitorCollection& fragMonCollection =
    sharedResources_->statisticsReporter_->getFragmentMonitorCollection();
  fragMonCollection.getAllFragmentSizeMQ().addSample( 
    static_cast<double>( i2oChain.totalDataSize() ) / 0x100000
  );

  sharedResources_->fragmentQueue_->enqWait(i2oChain);
}
void StorageManager::startWorkerThreads ( ) [private]

Create and start all worker threads

Definition at line 215 of file StorageManager.cc.

References diskWriter_, dqmEventProcessor_, alignCSCRings::e, exception, Exception, fragmentProcessor_, and sharedResources_.

Referenced by StorageManager().

{

  // Start the workloops
  try
  {
    fragmentProcessor_.reset( new FragmentProcessor( this, sharedResources_ ) );
    diskWriter_.reset( new DiskWriter(this, sharedResources_) );
    dqmEventProcessor_.reset( new DQMEventProcessor(this, sharedResources_) );
    sharedResources_->statisticsReporter_->startWorkLoop("theStatisticsReporter");
    fragmentProcessor_->startWorkLoop("theFragmentProcessor");
    diskWriter_->startWorkLoop("theDiskWriter");
    dqmEventProcessor_->startWorkLoop("theDQMEventProcessor");
  }
  catch(xcept::Exception &e)
  {
    sharedResources_->alarmHandler_->moveToFailedState( e );
  }
  catch(std::exception &e)
  {
    XCEPT_DECLARE(stor::exception::Exception,
      sentinelException, e.what());
    sharedResources_->alarmHandler_->moveToFailedState( sentinelException );
  }
  catch(...)
  {
    std::string errorMsg = "Unknown exception when starting the workloops";
    XCEPT_DECLARE(stor::exception::Exception,
      sentinelException, errorMsg);
    sharedResources_->alarmHandler_->moveToFailedState( sentinelException );
  }
}
void StorageManager::storedDataWebPage ( xgi::Input *  in,
xgi::Output *  out 
) throw (xgi::exception::Exception) [private]

Webinterface callback creating web page showing the stored data information

Definition at line 398 of file StorageManager.cc.

References alignCSCRings::e, exception, Exception, and dbtoconf::out.

Referenced by bindWebInterfaceCallbacks().

{
  std::string errorMsg = "Failed to create the stored data webpage";

  try
  {
    smWebPageHelper_->storedDataWebPage(out);
  }
  catch(std::exception &e)
  {
    errorMsg += ": ";
    errorMsg += e.what();
    
    LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
    XCEPT_RAISE(xgi::exception::Exception, errorMsg);
  }
  catch(...)
  {
    errorMsg += ": Unknown exception";
    
    LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
    XCEPT_RAISE(xgi::exception::Exception, errorMsg);
  }
}
void StorageManager::throughputWebPage ( xgi::Input *  in,
xgi::Output *  out 
) throw (xgi::exception::Exception) [private]

Webinterface callback creating web page showing statistics about the data throughput in the SM.

Definition at line 574 of file StorageManager.cc.

References alignCSCRings::e, exception, Exception, and dbtoconf::out.

Referenced by bindWebInterfaceCallbacks().

{
  std::string errorMsg = "Failed to create the throughput statistics webpage";

  try
  {
    smWebPageHelper_->throughputWebPage(out);
  }
  catch(std::exception &e)
  {
    errorMsg += ": ";
    errorMsg += e.what();
    
    LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
    XCEPT_RAISE(xgi::exception::Exception, errorMsg);
  }
  catch(...)
  {
    errorMsg += ": Unknown exception";
    
    LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
    XCEPT_RAISE(xgi::exception::Exception, errorMsg);
  }

}

Member Data Documentation

Definition at line 231 of file StorageManager.h.

Referenced by initializeSharedResources().

boost::scoped_ptr<DiskWriter> stor::StorageManager::diskWriter_ [private]

Definition at line 227 of file StorageManager.h.

Referenced by startWorkerThreads().

Definition at line 228 of file StorageManager.h.

Referenced by startWorkerThreads().

Definition at line 226 of file StorageManager.h.

Referenced by startWorkerThreads().

Definition at line 232 of file StorageManager.h.

Referenced by initializeSharedResources().