#include <StorageManager.h>
Public Member Functions | |
StorageManager (xdaq::ApplicationStub *s) | |
Private Types | |
typedef ConsumerUtils < Configuration, EventQueueCollection > | ConsumerUtils_t |
Private Member Functions | |
void | bindConsumerCallbacks () |
void | bindI2OCallbacks () |
void | bindStateMachineCallbacks () |
void | bindWebInterfaceCallbacks () |
void | consumerListWebPage (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception) |
void | consumerStatisticsPage (xgi::Input *in, xgi::Output *out) throw ( xgi::exception::Exception ) |
void | css (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception) |
void | defaultWebPage (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception) |
void | dqmEventStatisticsWebPage (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception) |
void | fileStatisticsWebPage (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception) |
xoap::MessageReference | handleFSMSoapMessage (xoap::MessageReference) throw ( xoap::exception::Exception ) |
void | initializeSharedResources () |
void | inputWebPage (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception) |
StorageManager & | operator= (StorageManager const &) |
void | processConsumerEventRequest (xgi::Input *in, xgi::Output *out) throw ( xgi::exception::Exception ) |
void | processConsumerHeaderRequest (xgi::Input *in, xgi::Output *out) throw ( xgi::exception::Exception ) |
void | processConsumerRegistrationRequest (xgi::Input *in, xgi::Output *out) throw ( xgi::exception::Exception ) |
void | processDQMConsumerEventRequest (xgi::Input *in, xgi::Output *out) throw ( xgi::exception::Exception ) |
void | processDQMConsumerRegistrationRequest (xgi::Input *in, xgi::Output *out) throw ( xgi::exception::Exception ) |
void | rbsenderDetailWebPage (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception) |
void | rbsenderWebPage (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception) |
void | receiveDataMessage (toolbox::mem::Reference *ref) |
void | receiveDQMMessage (toolbox::mem::Reference *ref) |
void | receiveEndOfLumiSectionMessage (toolbox::mem::Reference *ref) |
void | receiveErrorDataMessage (toolbox::mem::Reference *ref) |
void | receiveRegistryMessage (toolbox::mem::Reference *ref) |
void | startWorkerThreads () |
StorageManager (StorageManager const &) | |
void | storedDataWebPage (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception) |
void | throughputWebPage (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception) |
Private Attributes | |
boost::scoped_ptr < ConsumerUtils_t > | consumerUtils_ |
boost::scoped_ptr< DiskWriter > | diskWriter_ |
boost::scoped_ptr < DQMEventProcessor > | dqmEventProcessor_ |
boost::scoped_ptr < FragmentProcessor > | fragmentProcessor_ |
SharedResourcesPtr | sharedResources_ |
boost::scoped_ptr < SMWebPageHelper > | smWebPageHelper_ |
Main class of the StorageManager XDAQ application
Definition at line 46 of file StorageManager.h.
typedef ConsumerUtils<Configuration,EventQueueCollection> stor::StorageManager::ConsumerUtils_t [private] |
Definition at line 230 of file StorageManager.h.
StorageManager::StorageManager | ( | xdaq::ApplicationStub * | s | ) |
Definition at line 39 of file StorageManager.cc.
References bindConsumerCallbacks(), bindI2OCallbacks(), bindStateMachineCallbacks(), bindWebInterfaceCallbacks(), alignCSCRings::e, exception, Exception, initializeSharedResources(), and startWorkerThreads().
: xdaq::Application(s) { LOG4CPLUS_INFO(this->getApplicationLogger(),"Making StorageManager"); // bind all callback functions bindI2OCallbacks(); bindStateMachineCallbacks(); bindWebInterfaceCallbacks(); bindConsumerCallbacks(); std::string errorMsg = "Exception in StorageManager constructor: "; try { initializeSharedResources(); } catch(std::exception &e) { errorMsg += e.what(); LOG4CPLUS_FATAL( getApplicationLogger(), e.what() ); XCEPT_RAISE( stor::exception::Exception, e.what() ); } catch(...) { errorMsg += "unknown exception"; LOG4CPLUS_FATAL( getApplicationLogger(), errorMsg ); XCEPT_RAISE( stor::exception::Exception, errorMsg ); } startWorkerThreads(); }
stor::StorageManager::StorageManager | ( | StorageManager const & | ) | [private] |
void StorageManager::bindConsumerCallbacks | ( | ) | [private] |
Bind callbacks for consumers
Definition at line 138 of file StorageManager.cc.
References processConsumerEventRequest(), processConsumerHeaderRequest(), processConsumerRegistrationRequest(), processDQMConsumerEventRequest(), and processDQMConsumerRegistrationRequest().
Referenced by StorageManager().
{ // event consumers xgi::bind( this, &StorageManager::processConsumerRegistrationRequest, "registerConsumer" ); xgi::bind( this, &StorageManager::processConsumerHeaderRequest, "getregdata" ); xgi::bind( this, &StorageManager::processConsumerEventRequest, "geteventdata" ); // dqm event consumers xgi::bind(this,&StorageManager::processDQMConsumerRegistrationRequest, "registerDQMConsumer"); xgi::bind(this,&StorageManager::processDQMConsumerEventRequest, "getDQMeventdata"); }
void StorageManager::bindI2OCallbacks | ( | ) | [private] |
Bind callbacks for I2O message
Definition at line 72 of file StorageManager.cc.
References I2O_SM_DATA, I2O_SM_DQM, I2O_SM_ERROR, I2O_SM_PREAMBLE, receiveDataMessage(), receiveDQMMessage(), receiveEndOfLumiSectionMessage(), receiveErrorDataMessage(), and receiveRegistryMessage().
Referenced by StorageManager().
{ i2o::bind(this, &StorageManager::receiveRegistryMessage, I2O_SM_PREAMBLE, XDAQ_ORGANIZATION_ID); i2o::bind(this, &StorageManager::receiveDataMessage, I2O_SM_DATA, XDAQ_ORGANIZATION_ID); i2o::bind(this, &StorageManager::receiveErrorDataMessage, I2O_SM_ERROR, XDAQ_ORGANIZATION_ID); i2o::bind(this, &StorageManager::receiveDQMMessage, I2O_SM_DQM, XDAQ_ORGANIZATION_ID); i2o::bind(this, &StorageManager::receiveEndOfLumiSectionMessage, I2O_EVM_LUMISECTION, XDAQ_ORGANIZATION_ID); }
void StorageManager::bindStateMachineCallbacks | ( | ) | [private] |
Bind callbacks for state machine SOAP messages
Definition at line 97 of file StorageManager.cc.
References handleFSMSoapMessage().
Referenced by StorageManager().
{ xoap::bind( this, &StorageManager::handleFSMSoapMessage, "Configure", XDAQ_NS_URI ); xoap::bind( this, &StorageManager::handleFSMSoapMessage, "Enable", XDAQ_NS_URI ); xoap::bind( this, &StorageManager::handleFSMSoapMessage, "Stop", XDAQ_NS_URI ); xoap::bind( this, &StorageManager::handleFSMSoapMessage, "Halt", XDAQ_NS_URI ); xoap::bind( this, &StorageManager::handleFSMSoapMessage, "EmergencyStop", XDAQ_NS_URI ); }
void StorageManager::bindWebInterfaceCallbacks | ( | ) | [private] |
Bind callbacks for web interface
Definition at line 122 of file StorageManager.cc.
References consumerListWebPage(), consumerStatisticsPage(), css(), defaultWebPage(), dqmEventStatisticsWebPage(), fileStatisticsWebPage(), inputWebPage(), rbsenderDetailWebPage(), rbsenderWebPage(), storedDataWebPage(), and throughputWebPage().
Referenced by StorageManager().
{ xgi::bind(this,&StorageManager::css, "styles.css"); xgi::bind(this,&StorageManager::defaultWebPage, "Default"); xgi::bind(this,&StorageManager::inputWebPage, "input"); xgi::bind(this,&StorageManager::storedDataWebPage, "storedData"); xgi::bind(this,&StorageManager::rbsenderWebPage, "rbsenderlist"); xgi::bind(this,&StorageManager::rbsenderDetailWebPage, "rbsenderdetail"); xgi::bind(this,&StorageManager::fileStatisticsWebPage, "fileStatistics"); xgi::bind(this,&StorageManager::dqmEventStatisticsWebPage,"dqmEventStatistics"); xgi::bind(this,&StorageManager::consumerStatisticsPage, "consumerStatistics" ); xgi::bind(this,&StorageManager::consumerListWebPage, "consumerList"); xgi::bind(this,&StorageManager::throughputWebPage, "throughputStatistics"); }
void StorageManager::consumerListWebPage | ( | xgi::Input * | in, |
xgi::Output * | out | ||
) | throw (xgi::exception::Exception) [private] |
Callback returning a XML list of consumer information. The current implementation just returns an empty document.
Definition at line 603 of file StorageManager.cc.
References dbtoconf::out.
Referenced by bindWebInterfaceCallbacks().
void StorageManager::consumerStatisticsPage | ( | xgi::Input * | in, |
xgi::Output * | out | ||
) | throw ( xgi::exception::Exception ) [private] |
Webinterface callback creating web page showing the connected consumers
Definition at line 425 of file StorageManager.cc.
References alignCSCRings::e, exception, Exception, and dbtoconf::out.
Referenced by bindWebInterfaceCallbacks().
{ std::string err_msg = "Failed to create consumer statistics page"; try { smWebPageHelper_->consumerStatistics(out); } catch( std::exception &e ) { err_msg += ": "; err_msg += e.what(); LOG4CPLUS_ERROR( getApplicationLogger(), err_msg ); XCEPT_RAISE( xgi::exception::Exception, err_msg ); } catch(...) { err_msg += ": Unknown exception"; LOG4CPLUS_ERROR( getApplicationLogger(), err_msg ); XCEPT_RAISE( xgi::exception::Exception, err_msg ); } }
void StorageManager::css | ( | xgi::Input * | in, |
xgi::Output * | out | ||
) | throw (xgi::exception::Exception) [private] |
Webinterface callback for style sheet
Definition at line 337 of file StorageManager.cc.
References recoMuon::in, and dbtoconf::out.
Referenced by bindWebInterfaceCallbacks().
{ smWebPageHelper_->css(in,out); }
void StorageManager::defaultWebPage | ( | xgi::Input * | in, |
xgi::Output * | out | ||
) | throw (xgi::exception::Exception) [private] |
Webinterface callback creating default web page
Definition at line 344 of file StorageManager.cc.
References alignCSCRings::e, exception, Exception, and dbtoconf::out.
Referenced by bindWebInterfaceCallbacks().
{ std::string errorMsg = "Failed to create the default webpage"; try { smWebPageHelper_->defaultWebPage(out); } catch(std::exception &e) { errorMsg += ": "; errorMsg += e.what(); LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg); XCEPT_RAISE(xgi::exception::Exception, errorMsg); } catch(...) { errorMsg += ": Unknown exception"; LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg); XCEPT_RAISE(xgi::exception::Exception, errorMsg); } }
void StorageManager::dqmEventStatisticsWebPage | ( | xgi::Input * | in, |
xgi::Output * | out | ||
) | throw (xgi::exception::Exception) [private] |
Webinterface callback creating web page showing statistics about the processed DQM events.
Definition at line 547 of file StorageManager.cc.
References alignCSCRings::e, exception, Exception, and dbtoconf::out.
Referenced by bindWebInterfaceCallbacks().
{ std::string errorMsg = "Failed to create the DQM event statistics webpage"; try { smWebPageHelper_->dqmEventWebPage(out); } catch(std::exception &e) { errorMsg += ": "; errorMsg += e.what(); LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg); XCEPT_RAISE(xgi::exception::Exception, errorMsg); } catch(...) { errorMsg += ": Unknown exception"; LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg); XCEPT_RAISE(xgi::exception::Exception, errorMsg); } }
void StorageManager::fileStatisticsWebPage | ( | xgi::Input * | in, |
xgi::Output * | out | ||
) | throw (xgi::exception::Exception) [private] |
Webinterface callback creating web page showing information about recently written files
Definition at line 519 of file StorageManager.cc.
References alignCSCRings::e, exception, Exception, and dbtoconf::out.
Referenced by bindWebInterfaceCallbacks().
{ std::string errorMsg = "Failed to create the file statistics webpage"; try { smWebPageHelper_->filesWebPage(out); } catch(std::exception &e) { errorMsg += ": "; errorMsg += e.what(); LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg); XCEPT_RAISE(xgi::exception::Exception, errorMsg); } catch(...) { errorMsg += ": Unknown exception"; LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg); XCEPT_RAISE(xgi::exception::Exception, errorMsg); } }
xoap::MessageReference StorageManager::handleFSMSoapMessage | ( | xoap::MessageReference | msg | ) | throw ( xoap::exception::Exception ) [private] |
Callback for SOAP message containint a state machine event, possibly including new configuration values
Definition at line 619 of file StorageManager.cc.
References edmPickEvents::command, stor::soaputils::createFsmSoapResponseMsg(), alignCSCRings::e, exception, Exception, cms::Exception::explainSelf(), stor::soaputils::extractParameters(), and lumiQueryAPI::msg.
Referenced by bindStateMachineCallbacks().
{ std::string errorMsg; xoap::MessageReference returnMsg; try { errorMsg = "Failed to extract FSM event and parameters from SOAP message: "; std::string command = soaputils::extractParameters(msg, this); errorMsg = "Failed to put a '" + command + "' state machine event into command queue: "; if (command == "Configure") { sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::Configure() ) ); } else if (command == "Enable") { if (sharedResources_->configuration_->streamConfigurationHasChanged()) { sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::Reconfigure() ) ); } sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::Enable() ) ); } else if (command == "Stop") { sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::Stop() ) ); } else if (command == "Halt") { sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::Halt() ) ); } else if (command == "EmergencyStop") { sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::EmergencyStop() ) ); } else { XCEPT_RAISE(stor::exception::StateMachine, "Received an unknown state machine event '" + command + "'."); } errorMsg = "Failed to create FSM SOAP reply message: "; returnMsg = soaputils::createFsmSoapResponseMsg(command, sharedResources_->statisticsReporter_-> getStateMachineMonitorCollection().externallyVisibleState()); } catch (cms::Exception& e) { errorMsg += e.explainSelf(); XCEPT_DECLARE(xoap::exception::Exception, sentinelException, errorMsg); sharedResources_->alarmHandler_->moveToFailedState( sentinelException ); throw sentinelException; } catch (xcept::Exception &e) { XCEPT_DECLARE_NESTED(xoap::exception::Exception, sentinelException, errorMsg, e); sharedResources_->alarmHandler_->moveToFailedState( sentinelException ); throw sentinelException; } catch (std::exception& e) { errorMsg += e.what(); XCEPT_DECLARE(xoap::exception::Exception, sentinelException, errorMsg); sharedResources_->alarmHandler_->moveToFailedState( sentinelException ); throw sentinelException; } catch (...) { errorMsg += "Unknown exception"; XCEPT_DECLARE(xoap::exception::Exception, sentinelException, errorMsg); sharedResources_->alarmHandler_->moveToFailedState( sentinelException ); throw sentinelException; } return returnMsg; }
void StorageManager::initializeSharedResources | ( | ) | [private] |
Initialize the shared resources
Definition at line 151 of file StorageManager.cc.
References stor::QueueConfigurationParams::commandQueueSize_, edm::errors::Configuration, consumerUtils_, stor::QueueConfigurationParams::dqmEventQueueMemoryLimitMB_, stor::QueueConfigurationParams::dqmEventQueueSize_, stor::QueueConfigurationParams::fragmentQueueMemoryLimitMB_, stor::QueueConfigurationParams::fragmentQueueSize_, instance, stor::QueueConfigurationParams::registrationQueueSize_, reset(), sharedResources_, smWebPageHelper_, stor::QueueConfigurationParams::streamQueueMemoryLimitMB_, and stor::QueueConfigurationParams::streamQueueSize_.
Referenced by StorageManager().
{ sharedResources_.reset(new SharedResources()); xdata::InfoSpace *ispace = getApplicationInfoSpace(); unsigned long instance = getApplicationDescriptor()->getInstance(); sharedResources_->configuration_.reset(new Configuration(ispace, instance)); QueueConfigurationParams queueParams = sharedResources_->configuration_->getQueueConfigurationParams(); sharedResources_->commandQueue_. reset(new CommandQueue(queueParams.commandQueueSize_)); sharedResources_->fragmentQueue_. reset(new FragmentQueue(queueParams.fragmentQueueSize_, queueParams.fragmentQueueMemoryLimitMB_ * 1024*1024)); sharedResources_->registrationQueue_. reset(new RegistrationQueue(queueParams.registrationQueueSize_)); sharedResources_->streamQueue_. reset(new StreamQueue(queueParams.streamQueueSize_, queueParams.streamQueueMemoryLimitMB_ * 1024*1024)); sharedResources_->dqmEventQueue_. reset(new DQMEventQueue(queueParams.dqmEventQueueSize_, queueParams.dqmEventQueueMemoryLimitMB_ * 1024*1024)); sharedResources_->alarmHandler_.reset( new AlarmHandler(this, sharedResources_) ); sharedResources_->statisticsReporter_.reset( new StatisticsReporter(this, sharedResources_) ); sharedResources_->initMsgCollection_.reset(new InitMsgCollection()); sharedResources_->diskWriterResources_.reset(new DiskWriterResources()); sharedResources_->dqmEventProcessorResources_.reset(new DQMEventProcessorResources()); sharedResources_->statisticsReporter_->getThroughputMonitorCollection().setFragmentQueue(sharedResources_->fragmentQueue_); sharedResources_->statisticsReporter_->getThroughputMonitorCollection().setStreamQueue(sharedResources_->streamQueue_); sharedResources_->statisticsReporter_->getThroughputMonitorCollection().setDQMEventQueue(sharedResources_->dqmEventQueue_); sharedResources_-> discardManager_.reset(new DiscardManager(getApplicationContext(), getApplicationDescriptor(), sharedResources_->statisticsReporter_-> getDataSenderMonitorCollection())); sharedResources_->registrationCollection_.reset( new RegistrationCollection() ); EventConsumerMonitorCollection& ecmc = sharedResources_->statisticsReporter_->getEventConsumerMonitorCollection(); sharedResources_->eventQueueCollection_.reset( new EventQueueCollection( ecmc ) ); DQMConsumerMonitorCollection& dcmc = sharedResources_->statisticsReporter_->getDQMConsumerMonitorCollection(); sharedResources_->dqmEventQueueCollection_.reset( new DQMEventQueueCollection( dcmc ) ); consumerUtils_.reset( new ConsumerUtils_t( sharedResources_->configuration_, sharedResources_->registrationCollection_, sharedResources_->registrationQueue_, sharedResources_->initMsgCollection_, sharedResources_->eventQueueCollection_, sharedResources_->dqmEventQueueCollection_, sharedResources_->alarmHandler_ ) ); smWebPageHelper_.reset( new SMWebPageHelper( getApplicationDescriptor(), sharedResources_)); }
void StorageManager::inputWebPage | ( | xgi::Input * | in, |
xgi::Output * | out | ||
) | throw (xgi::exception::Exception) [private] |
Webinterface callback creating web page showing the I2O input information
Definition at line 371 of file StorageManager.cc.
References alignCSCRings::e, exception, Exception, and dbtoconf::out.
Referenced by bindWebInterfaceCallbacks().
{ std::string errorMsg = "Failed to create the I2O input webpage"; try { smWebPageHelper_->inputWebPage(out); } catch(std::exception &e) { errorMsg += ": "; errorMsg += e.what(); LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg); XCEPT_RAISE(xgi::exception::Exception, errorMsg); } catch(...) { errorMsg += ": Unknown exception"; LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg); XCEPT_RAISE(xgi::exception::Exception, errorMsg); } }
StorageManager& stor::StorageManager::operator= | ( | StorageManager const & | ) | [private] |
void StorageManager::processConsumerEventRequest | ( | xgi::Input * | in, |
xgi::Output * | out | ||
) | throw ( xgi::exception::Exception ) [private] |
Callback handling event consumer event request
Definition at line 718 of file StorageManager.cc.
References recoMuon::in, and dbtoconf::out.
Referenced by bindConsumerCallbacks().
{ consumerUtils_->processConsumerEventRequest(in,out); }
void StorageManager::processConsumerHeaderRequest | ( | xgi::Input * | in, |
xgi::Output * | out | ||
) | throw ( xgi::exception::Exception ) [private] |
Callback handling event consumer init message request
Definition at line 710 of file StorageManager.cc.
References recoMuon::in, and dbtoconf::out.
Referenced by bindConsumerCallbacks().
{ consumerUtils_->processConsumerHeaderRequest(in,out); }
void StorageManager::processConsumerRegistrationRequest | ( | xgi::Input * | in, |
xgi::Output * | out | ||
) | throw ( xgi::exception::Exception ) [private] |
Callback handling event consumer registration request
Definition at line 702 of file StorageManager.cc.
References recoMuon::in, and dbtoconf::out.
Referenced by bindConsumerCallbacks().
{ consumerUtils_->processConsumerRegistrationRequest(in,out); }
void StorageManager::processDQMConsumerEventRequest | ( | xgi::Input * | in, |
xgi::Output * | out | ||
) | throw ( xgi::exception::Exception ) [private] |
Callback handling DQM event consumer DQM event request
Definition at line 734 of file StorageManager.cc.
References recoMuon::in, and dbtoconf::out.
Referenced by bindConsumerCallbacks().
{ consumerUtils_->processDQMConsumerEventRequest(in,out); }
void StorageManager::processDQMConsumerRegistrationRequest | ( | xgi::Input * | in, |
xgi::Output * | out | ||
) | throw ( xgi::exception::Exception ) [private] |
Callback handling DQM event consumer registration request
Definition at line 726 of file StorageManager.cc.
References recoMuon::in, and dbtoconf::out.
Referenced by bindConsumerCallbacks().
{ consumerUtils_->processDQMConsumerRegistrationRequest(in,out); }
void StorageManager::rbsenderDetailWebPage | ( | xgi::Input * | in, |
xgi::Output * | out | ||
) | throw (xgi::exception::Exception) [private] |
Webinterface callback creating web page showing detailed information about the resource broker sending data.
Definition at line 482 of file StorageManager.cc.
References alignCSCRings::e, exception, Exception, recoMuon::in, and dbtoconf::out.
Referenced by bindWebInterfaceCallbacks().
{ std::string errorMsg = "Failed to create the data sender webpage"; try { long long localRBID = 0; cgicc::Cgicc cgiWrapper(in); cgicc::const_form_iterator updateRef = cgiWrapper.getElement("id"); if (updateRef != cgiWrapper.getElements().end()) { std::string idString = updateRef->getValue(); localRBID = boost::lexical_cast<long long>(idString); } smWebPageHelper_->resourceBrokerDetail(out, localRBID); } catch(std::exception &e) { errorMsg += ": "; errorMsg += e.what(); LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg); XCEPT_RAISE(xgi::exception::Exception, errorMsg); } catch(...) { errorMsg += ": Unknown exception"; LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg); XCEPT_RAISE(xgi::exception::Exception, errorMsg); } }
void StorageManager::rbsenderWebPage | ( | xgi::Input * | in, |
xgi::Output * | out | ||
) | throw (xgi::exception::Exception) [private] |
Webinterface callback creating web page showing summary information about the resource broker sending data.
Definition at line 454 of file StorageManager.cc.
References alignCSCRings::e, exception, Exception, and dbtoconf::out.
Referenced by bindWebInterfaceCallbacks().
{ std::string errorMsg = "Failed to create the data sender webpage"; try { smWebPageHelper_->resourceBrokerOverview(out); } catch(std::exception &e) { errorMsg += ": "; errorMsg += e.what(); LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg); XCEPT_RAISE(xgi::exception::Exception, errorMsg); } catch(...) { errorMsg += ": Unknown exception"; LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg); XCEPT_RAISE(xgi::exception::Exception, errorMsg); } }
void StorageManager::receiveDataMessage | ( | toolbox::mem::Reference * | ref | ) | [private] |
Callback for I2O message containing an event
Definition at line 272 of file StorageManager.cc.
References stor::FragmentMonitorCollection::addEventFragmentSample(), alignCSCRings::r, sharedResources_, and stor::I2OChain::totalDataSize().
Referenced by bindI2OCallbacks().
{ I2OChain i2oChain(ref); FragmentMonitorCollection& fragMonCollection = sharedResources_->statisticsReporter_->getFragmentMonitorCollection(); fragMonCollection.addEventFragmentSample( i2oChain.totalDataSize() ); sharedResources_->fragmentQueue_->enqWait(i2oChain); #ifdef STOR_DEBUG_DUPLICATE_MESSAGES double r = rand()/static_cast<double>(RAND_MAX); if (r < 0.001) { LOG4CPLUS_INFO(this->getApplicationLogger(), "Simulating duplicated data message"); receiveDataMessage(ref->duplicate()); } #endif }
void StorageManager::receiveDQMMessage | ( | toolbox::mem::Reference * | ref | ) | [private] |
Callback for I2O message containing a DQM event (histogramms)
Definition at line 305 of file StorageManager.cc.
References stor::FragmentMonitorCollection::addDQMEventFragmentSample(), sharedResources_, and stor::I2OChain::totalDataSize().
Referenced by bindI2OCallbacks().
{ I2OChain i2oChain(ref); FragmentMonitorCollection& fragMonCollection = sharedResources_->statisticsReporter_->getFragmentMonitorCollection(); fragMonCollection.addDQMEventFragmentSample( i2oChain.totalDataSize() ); sharedResources_->fragmentQueue_->enqWait(i2oChain); }
void StorageManager::receiveEndOfLumiSectionMessage | ( | toolbox::mem::Reference * | ref | ) | [private] |
Callback for I2O message notifying the end-of-lumi-section
Definition at line 317 of file StorageManager.cc.
References stor::FragmentMonitorCollection::addFragmentSample(), stor::MonitoredQuantity::addSample(), stor::RunMonitorCollection::getEoLSSeenMQ(), stor::I2OChain::lumiSection(), sharedResources_, and stor::I2OChain::totalDataSize().
Referenced by bindI2OCallbacks().
{ I2OChain i2oChain( ref ); FragmentMonitorCollection& fragMonCollection = sharedResources_->statisticsReporter_->getFragmentMonitorCollection(); fragMonCollection.addFragmentSample( i2oChain.totalDataSize() ); RunMonitorCollection& runMonCollection = sharedResources_->statisticsReporter_->getRunMonitorCollection(); runMonCollection.getEoLSSeenMQ().addSample( i2oChain.lumiSection() ); sharedResources_->streamQueue_->enqWait( i2oChain ); }
void StorageManager::receiveErrorDataMessage | ( | toolbox::mem::Reference * | ref | ) | [private] |
Callback for I2O message containing an error event
Definition at line 293 of file StorageManager.cc.
References stor::FragmentMonitorCollection::addEventFragmentSample(), sharedResources_, and stor::I2OChain::totalDataSize().
Referenced by bindI2OCallbacks().
{ I2OChain i2oChain(ref); FragmentMonitorCollection& fragMonCollection = sharedResources_->statisticsReporter_->getFragmentMonitorCollection(); fragMonCollection.addEventFragmentSample( i2oChain.totalDataSize() ); sharedResources_->fragmentQueue_->enqWait(i2oChain); }
void StorageManager::receiveRegistryMessage | ( | toolbox::mem::Reference * | ref | ) | [private] |
Callback for I2O message containing an init message
Definition at line 253 of file StorageManager.cc.
References stor::MonitoredQuantity::addSample(), stor::FragmentMonitorCollection::getAllFragmentSizeMQ(), stor::ThroughputMonitorCollection::setMemoryPoolPointer(), sharedResources_, and stor::I2OChain::totalDataSize().
Referenced by bindI2OCallbacks().
{ I2OChain i2oChain(ref); // Set the I2O message pool pointer. Only done for init messages. ThroughputMonitorCollection& throughputMonCollection = sharedResources_->statisticsReporter_->getThroughputMonitorCollection(); throughputMonCollection.setMemoryPoolPointer( ref->getBuffer()->getPool() ); FragmentMonitorCollection& fragMonCollection = sharedResources_->statisticsReporter_->getFragmentMonitorCollection(); fragMonCollection.getAllFragmentSizeMQ().addSample( static_cast<double>( i2oChain.totalDataSize() ) / 0x100000 ); sharedResources_->fragmentQueue_->enqWait(i2oChain); }
void StorageManager::startWorkerThreads | ( | ) | [private] |
Create and start all worker threads
Definition at line 215 of file StorageManager.cc.
References diskWriter_, dqmEventProcessor_, alignCSCRings::e, exception, Exception, fragmentProcessor_, and sharedResources_.
Referenced by StorageManager().
{ // Start the workloops try { fragmentProcessor_.reset( new FragmentProcessor( this, sharedResources_ ) ); diskWriter_.reset( new DiskWriter(this, sharedResources_) ); dqmEventProcessor_.reset( new DQMEventProcessor(this, sharedResources_) ); sharedResources_->statisticsReporter_->startWorkLoop("theStatisticsReporter"); fragmentProcessor_->startWorkLoop("theFragmentProcessor"); diskWriter_->startWorkLoop("theDiskWriter"); dqmEventProcessor_->startWorkLoop("theDQMEventProcessor"); } catch(xcept::Exception &e) { sharedResources_->alarmHandler_->moveToFailedState( e ); } catch(std::exception &e) { XCEPT_DECLARE(stor::exception::Exception, sentinelException, e.what()); sharedResources_->alarmHandler_->moveToFailedState( sentinelException ); } catch(...) { std::string errorMsg = "Unknown exception when starting the workloops"; XCEPT_DECLARE(stor::exception::Exception, sentinelException, errorMsg); sharedResources_->alarmHandler_->moveToFailedState( sentinelException ); } }
void StorageManager::storedDataWebPage | ( | xgi::Input * | in, |
xgi::Output * | out | ||
) | throw (xgi::exception::Exception) [private] |
Webinterface callback creating web page showing the stored data information
Definition at line 398 of file StorageManager.cc.
References alignCSCRings::e, exception, Exception, and dbtoconf::out.
Referenced by bindWebInterfaceCallbacks().
{ std::string errorMsg = "Failed to create the stored data webpage"; try { smWebPageHelper_->storedDataWebPage(out); } catch(std::exception &e) { errorMsg += ": "; errorMsg += e.what(); LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg); XCEPT_RAISE(xgi::exception::Exception, errorMsg); } catch(...) { errorMsg += ": Unknown exception"; LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg); XCEPT_RAISE(xgi::exception::Exception, errorMsg); } }
void StorageManager::throughputWebPage | ( | xgi::Input * | in, |
xgi::Output * | out | ||
) | throw (xgi::exception::Exception) [private] |
Webinterface callback creating web page showing statistics about the data throughput in the SM.
Definition at line 574 of file StorageManager.cc.
References alignCSCRings::e, exception, Exception, and dbtoconf::out.
Referenced by bindWebInterfaceCallbacks().
{ std::string errorMsg = "Failed to create the throughput statistics webpage"; try { smWebPageHelper_->throughputWebPage(out); } catch(std::exception &e) { errorMsg += ": "; errorMsg += e.what(); LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg); XCEPT_RAISE(xgi::exception::Exception, errorMsg); } catch(...) { errorMsg += ": Unknown exception"; LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg); XCEPT_RAISE(xgi::exception::Exception, errorMsg); } }
boost::scoped_ptr<ConsumerUtils_t> stor::StorageManager::consumerUtils_ [private] |
Definition at line 231 of file StorageManager.h.
Referenced by initializeSharedResources().
boost::scoped_ptr<DiskWriter> stor::StorageManager::diskWriter_ [private] |
Definition at line 227 of file StorageManager.h.
Referenced by startWorkerThreads().
boost::scoped_ptr<DQMEventProcessor> stor::StorageManager::dqmEventProcessor_ [private] |
Definition at line 228 of file StorageManager.h.
Referenced by startWorkerThreads().
boost::scoped_ptr<FragmentProcessor> stor::StorageManager::fragmentProcessor_ [private] |
Definition at line 226 of file StorageManager.h.
Referenced by startWorkerThreads().
Definition at line 224 of file StorageManager.h.
Referenced by initializeSharedResources(), receiveDataMessage(), receiveDQMMessage(), receiveEndOfLumiSectionMessage(), receiveErrorDataMessage(), receiveRegistryMessage(), and startWorkerThreads().
boost::scoped_ptr<SMWebPageHelper> stor::StorageManager::smWebPageHelper_ [private] |
Definition at line 232 of file StorageManager.h.
Referenced by initializeSharedResources().