CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
List of all members | Public Member Functions | Private Types | Private Member Functions | Private Attributes
stor::StorageManager Class Reference

#include <StorageManager.h>

Inheritance diagram for stor::StorageManager:

Public Member Functions

 StorageManager (xdaq::ApplicationStub *s)
 

Private Types

typedef ConsumerUtils
< Configuration,
EventQueueCollection
ConsumerUtils_t
 

Private Member Functions

void bindConsumerCallbacks ()
 
void bindI2OCallbacks ()
 
void bindStateMachineCallbacks ()
 
void bindWebInterfaceCallbacks ()
 
void consumerListWebPage (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception)
 
void consumerStatisticsPage (xgi::Input *in, xgi::Output *out) throw ( xgi::exception::Exception )
 
void css (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception)
 
void defaultWebPage (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception)
 
void dqmEventStatisticsWebPage (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception)
 
void fileStatisticsWebPage (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception)
 
xoap::MessageReference handleFSMSoapMessage (xoap::MessageReference) throw ( xoap::exception::Exception )
 
void initializeSharedResources ()
 
StorageManageroperator= (StorageManager const &)
 
void processConsumerEventRequest (xgi::Input *in, xgi::Output *out) throw ( xgi::exception::Exception )
 
void processConsumerHeaderRequest (xgi::Input *in, xgi::Output *out) throw ( xgi::exception::Exception )
 
void processConsumerRegistrationRequest (xgi::Input *in, xgi::Output *out) throw ( xgi::exception::Exception )
 
void processDQMConsumerEventRequest (xgi::Input *in, xgi::Output *out) throw ( xgi::exception::Exception )
 
void processDQMConsumerRegistrationRequest (xgi::Input *in, xgi::Output *out) throw ( xgi::exception::Exception )
 
void rbsenderDetailWebPage (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception)
 
void rbsenderWebPage (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception)
 
void receiveDataMessage (toolbox::mem::Reference *ref)
 
void receiveDQMMessage (toolbox::mem::Reference *ref)
 
void receiveEndOfLumiSectionMessage (toolbox::mem::Reference *ref)
 
void receiveErrorDataMessage (toolbox::mem::Reference *ref)
 
void receiveRegistryMessage (toolbox::mem::Reference *ref)
 
void startWorkerThreads ()
 
 StorageManager (StorageManager const &)
 
void storedDataWebPage (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception)
 
void throughputWebPage (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception)
 

Private Attributes

boost::scoped_ptr
< ConsumerUtils_t
consumerUtils_
 
boost::scoped_ptr< DiskWriterdiskWriter_
 
boost::scoped_ptr
< DQMEventProcessor
dqmEventProcessor_
 
boost::scoped_ptr
< FragmentProcessor
fragmentProcessor_
 
SharedResourcesPtr sharedResources_
 
boost::scoped_ptr
< SMWebPageHelper
smWebPageHelper_
 

Detailed Description

Main class of the StorageManager XDAQ application

Author:
mommsen
Revision:
1.58.6.1
Date:
2011/03/07 11:33:04

Definition at line 44 of file StorageManager.h.

Member Typedef Documentation

Definition at line 222 of file StorageManager.h.

Constructor & Destructor Documentation

StorageManager::StorageManager ( xdaq::ApplicationStub *  s)

Definition at line 39 of file StorageManager.cc.

References bindConsumerCallbacks(), bindI2OCallbacks(), bindStateMachineCallbacks(), bindWebInterfaceCallbacks(), ExpressReco_HICollisions_FallBack::e, cmsCodeRules.cppFunctionSkipper::exception, edm::hlt::Exception, initializeSharedResources(), and startWorkerThreads().

39  :
40  xdaq::Application(s)
41 {
42  LOG4CPLUS_INFO(this->getApplicationLogger(),"Making StorageManager");
43 
44  // bind all callback functions
49 
50  std::string errorMsg = "Exception in StorageManager constructor: ";
51  try
52  {
54  }
55  catch(std::exception &e)
56  {
57  errorMsg += e.what();
58  LOG4CPLUS_FATAL( getApplicationLogger(), e.what() );
59  XCEPT_RAISE( stor::exception::Exception, e.what() );
60  }
61  catch(...)
62  {
63  errorMsg += "unknown exception";
64  LOG4CPLUS_FATAL( getApplicationLogger(), errorMsg );
65  XCEPT_RAISE( stor::exception::Exception, errorMsg );
66  }
67 
69 }
string s
Definition: asciidump.py:422
stor::StorageManager::StorageManager ( StorageManager const &  )
private

Member Function Documentation

void StorageManager::bindConsumerCallbacks ( )
private

Bind callbacks for consumers

Definition at line 137 of file StorageManager.cc.

References processConsumerEventRequest(), processConsumerHeaderRequest(), processConsumerRegistrationRequest(), processDQMConsumerEventRequest(), and processDQMConsumerRegistrationRequest().

Referenced by StorageManager().

138 {
139  // event consumers
140  xgi::bind( this, &StorageManager::processConsumerRegistrationRequest, "registerConsumer" );
141  xgi::bind( this, &StorageManager::processConsumerHeaderRequest, "getregdata" );
142  xgi::bind( this, &StorageManager::processConsumerEventRequest, "geteventdata" );
143 
144  // dqm event consumers
145  xgi::bind(this,&StorageManager::processDQMConsumerRegistrationRequest, "registerDQMConsumer");
146  xgi::bind(this,&StorageManager::processDQMConsumerEventRequest, "getDQMeventdata");
147 }
void processConsumerRegistrationRequest(xgi::Input *in, xgi::Output *out)
void processConsumerHeaderRequest(xgi::Input *in, xgi::Output *out)
void processDQMConsumerEventRequest(xgi::Input *in, xgi::Output *out)
void processDQMConsumerRegistrationRequest(xgi::Input *in, xgi::Output *out)
void processConsumerEventRequest(xgi::Input *in, xgi::Output *out)
void StorageManager::bindI2OCallbacks ( )
private

Bind callbacks for I2O message

Definition at line 72 of file StorageManager.cc.

References I2O_SM_DATA, I2O_SM_DQM, I2O_SM_ERROR, I2O_SM_PREAMBLE, receiveDataMessage(), receiveDQMMessage(), receiveEndOfLumiSectionMessage(), receiveErrorDataMessage(), and receiveRegistryMessage().

Referenced by StorageManager().

73 {
74  i2o::bind(this,
77  XDAQ_ORGANIZATION_ID);
78  i2o::bind(this,
81  XDAQ_ORGANIZATION_ID);
82  i2o::bind(this,
85  XDAQ_ORGANIZATION_ID);
86  i2o::bind(this,
88  I2O_SM_DQM,
89  XDAQ_ORGANIZATION_ID);
90  i2o::bind(this,
92  I2O_EVM_LUMISECTION,
93  XDAQ_ORGANIZATION_ID);
94 }
#define I2O_SM_ERROR
Definition: i2oEvfMsgs.h:23
#define I2O_SM_DQM
Definition: i2oEvfMsgs.h:25
void receiveEndOfLumiSectionMessage(toolbox::mem::Reference *ref)
#define I2O_SM_PREAMBLE
Definition: i2oEvfMsgs.h:21
void receiveDataMessage(toolbox::mem::Reference *ref)
void receiveErrorDataMessage(toolbox::mem::Reference *ref)
void receiveRegistryMessage(toolbox::mem::Reference *ref)
#define I2O_SM_DATA
Definition: i2oEvfMsgs.h:22
void receiveDQMMessage(toolbox::mem::Reference *ref)
void StorageManager::bindStateMachineCallbacks ( )
private

Bind callbacks for state machine SOAP messages

Definition at line 97 of file StorageManager.cc.

References handleFSMSoapMessage().

Referenced by StorageManager().

98 {
99  xoap::bind( this,
101  "Configure",
102  XDAQ_NS_URI );
103  xoap::bind( this,
105  "Enable",
106  XDAQ_NS_URI );
107  xoap::bind( this,
109  "Stop",
110  XDAQ_NS_URI );
111  xoap::bind( this,
113  "Halt",
114  XDAQ_NS_URI );
115  xoap::bind( this,
117  "EmergencyStop",
118  XDAQ_NS_URI );
119 }
xoap::MessageReference handleFSMSoapMessage(xoap::MessageReference)
void StorageManager::bindWebInterfaceCallbacks ( )
private

Bind callbacks for web interface

Definition at line 122 of file StorageManager.cc.

References consumerListWebPage(), consumerStatisticsPage(), css(), defaultWebPage(), dqmEventStatisticsWebPage(), fileStatisticsWebPage(), rbsenderDetailWebPage(), rbsenderWebPage(), storedDataWebPage(), and throughputWebPage().

Referenced by StorageManager().

123 {
124  xgi::bind(this,&StorageManager::css, "styles.css");
125  xgi::bind(this,&StorageManager::defaultWebPage, "Default");
126  xgi::bind(this,&StorageManager::storedDataWebPage, "storedData");
127  xgi::bind(this,&StorageManager::rbsenderWebPage, "rbsenderlist");
128  xgi::bind(this,&StorageManager::rbsenderDetailWebPage, "rbsenderdetail");
129  xgi::bind(this,&StorageManager::fileStatisticsWebPage, "fileStatistics");
130  xgi::bind(this,&StorageManager::dqmEventStatisticsWebPage,"dqmEventStatistics");
131  xgi::bind(this,&StorageManager::consumerStatisticsPage, "consumerStatistics" );
132  xgi::bind(this,&StorageManager::consumerListWebPage, "consumerList");
133  xgi::bind(this,&StorageManager::throughputWebPage, "throughputStatistics");
134 }
void consumerStatisticsPage(xgi::Input *in, xgi::Output *out)
void consumerListWebPage(xgi::Input *in, xgi::Output *out)
void fileStatisticsWebPage(xgi::Input *in, xgi::Output *out)
void rbsenderDetailWebPage(xgi::Input *in, xgi::Output *out)
void storedDataWebPage(xgi::Input *in, xgi::Output *out)
void css(xgi::Input *in, xgi::Output *out)
void dqmEventStatisticsWebPage(xgi::Input *in, xgi::Output *out)
void rbsenderWebPage(xgi::Input *in, xgi::Output *out)
void defaultWebPage(xgi::Input *in, xgi::Output *out)
void throughputWebPage(xgi::Input *in, xgi::Output *out)
void StorageManager::consumerListWebPage ( xgi::Input in,
xgi::Output out 
)
throw (xgi::exception::Exception
)
private

Callback returning a XML list of consumer information. The current implementation just returns an empty document.

Definition at line 574 of file StorageManager.cc.

References dbtoconf::out.

Referenced by bindWebInterfaceCallbacks().

576 {
577  out->getHTTPResponseHeader().addHeader( "Content-Type",
578  "application/octet-stream" );
579  out->getHTTPResponseHeader().addHeader( "Content-Transfer-Encoding",
580  "binary" );
581  char buff;
582  out->write( &buff, 0 );
583 }
tuple out
Definition: dbtoconf.py:99
void StorageManager::consumerStatisticsPage ( xgi::Input in,
xgi::Output out 
)
throw (xgi::exception::Exception
)
private

Webinterface callback creating web page showing the connected consumers

Definition at line 396 of file StorageManager.cc.

References ExpressReco_HICollisions_FallBack::e, cmsCodeRules.cppFunctionSkipper::exception, edm::hlt::Exception, and dbtoconf::out.

Referenced by bindWebInterfaceCallbacks().

399 {
400 
401  std::string err_msg =
402  "Failed to create consumer statistics page";
403 
404  try
405  {
406  smWebPageHelper_->consumerStatistics(out);
407  }
408  catch( std::exception &e )
409  {
410  err_msg += ": ";
411  err_msg += e.what();
412  LOG4CPLUS_ERROR( getApplicationLogger(), err_msg );
413  XCEPT_RAISE( xgi::exception::Exception, err_msg );
414  }
415  catch(...)
416  {
417  err_msg += ": Unknown exception";
418  LOG4CPLUS_ERROR( getApplicationLogger(), err_msg );
419  XCEPT_RAISE( xgi::exception::Exception, err_msg );
420  }
421 
422 }
tuple out
Definition: dbtoconf.py:99
boost::scoped_ptr< SMWebPageHelper > smWebPageHelper_
void StorageManager::css ( xgi::Input in,
xgi::Output out 
)
throw (xgi::exception::Exception
)
private

Webinterface callback for style sheet

Definition at line 335 of file StorageManager.cc.

References recoMuon::in, and dbtoconf::out.

Referenced by bindWebInterfaceCallbacks().

337 {
338  smWebPageHelper_->css(in,out);
339 }
tuple out
Definition: dbtoconf.py:99
boost::scoped_ptr< SMWebPageHelper > smWebPageHelper_
void StorageManager::defaultWebPage ( xgi::Input in,
xgi::Output out 
)
throw (xgi::exception::Exception
)
private

Webinterface callback creating default web page

Definition at line 342 of file StorageManager.cc.

References ExpressReco_HICollisions_FallBack::e, cmsCodeRules.cppFunctionSkipper::exception, edm::hlt::Exception, and dbtoconf::out.

Referenced by bindWebInterfaceCallbacks().

344 {
345  std::string errorMsg = "Failed to create the default webpage";
346 
347  try
348  {
349  smWebPageHelper_->defaultWebPage(out);
350  }
351  catch(std::exception &e)
352  {
353  errorMsg += ": ";
354  errorMsg += e.what();
355 
356  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
357  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
358  }
359  catch(...)
360  {
361  errorMsg += ": Unknown exception";
362 
363  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
364  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
365  }
366 }
tuple out
Definition: dbtoconf.py:99
boost::scoped_ptr< SMWebPageHelper > smWebPageHelper_
void StorageManager::dqmEventStatisticsWebPage ( xgi::Input in,
xgi::Output out 
)
throw (xgi::exception::Exception
)
private

Webinterface callback creating web page showing statistics about the processed DQM events.

Definition at line 518 of file StorageManager.cc.

References ExpressReco_HICollisions_FallBack::e, cmsCodeRules.cppFunctionSkipper::exception, edm::hlt::Exception, and dbtoconf::out.

Referenced by bindWebInterfaceCallbacks().

520 {
521  std::string errorMsg = "Failed to create the DQM event statistics webpage";
522 
523  try
524  {
525  smWebPageHelper_->dqmEventWebPage(out);
526  }
527  catch(std::exception &e)
528  {
529  errorMsg += ": ";
530  errorMsg += e.what();
531 
532  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
533  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
534  }
535  catch(...)
536  {
537  errorMsg += ": Unknown exception";
538 
539  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
540  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
541  }
542 }
tuple out
Definition: dbtoconf.py:99
boost::scoped_ptr< SMWebPageHelper > smWebPageHelper_
void StorageManager::fileStatisticsWebPage ( xgi::Input in,
xgi::Output out 
)
throw (xgi::exception::Exception
)
private

Webinterface callback creating web page showing information about recently written files

Definition at line 490 of file StorageManager.cc.

References ExpressReco_HICollisions_FallBack::e, cmsCodeRules.cppFunctionSkipper::exception, edm::hlt::Exception, and dbtoconf::out.

Referenced by bindWebInterfaceCallbacks().

492 {
493  std::string errorMsg = "Failed to create the file statistics webpage";
494 
495  try
496  {
497  smWebPageHelper_->filesWebPage(out);
498  }
499  catch(std::exception &e)
500  {
501  errorMsg += ": ";
502  errorMsg += e.what();
503 
504  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
505  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
506  }
507  catch(...)
508  {
509  errorMsg += ": Unknown exception";
510 
511  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
512  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
513  }
514 
515 }
tuple out
Definition: dbtoconf.py:99
boost::scoped_ptr< SMWebPageHelper > smWebPageHelper_
xoap::MessageReference StorageManager::handleFSMSoapMessage ( xoap::MessageReference  msg)
throw (xoap::exception::Exception
)
private

Callback for SOAP message containint a state machine event, possibly including new configuration values

Definition at line 590 of file StorageManager.cc.

References edmPickEvents::command, stor::soaputils::createFsmSoapResponseMsg(), ExpressReco_HICollisions_FallBack::e, cmsCodeRules.cppFunctionSkipper::exception, edm::hlt::Exception, cms::Exception::explainSelf(), stor::soaputils::extractParameters(), and runTheMatrix::msg.

Referenced by bindStateMachineCallbacks().

592 {
593  std::string errorMsg;
594  xoap::MessageReference returnMsg;
595 
596  try {
597  errorMsg = "Failed to extract FSM event and parameters from SOAP message: ";
598  std::string command = soaputils::extractParameters(msg, this);
599 
600  errorMsg = "Failed to put a '" + command + "' state machine event into command queue: ";
601  if (command == "Configure")
602  {
603  sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::Configure() ) );
604  }
605  else if (command == "Enable")
606  {
607  if (sharedResources_->configuration_->streamConfigurationHasChanged())
608  {
609  sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::Reconfigure() ) );
610  }
611  sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::Enable() ) );
612  }
613  else if (command == "Stop")
614  {
615  sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::Stop() ) );
616  }
617  else if (command == "Halt")
618  {
619  sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::Halt() ) );
620  }
621  else if (command == "EmergencyStop")
622  {
623  sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::EmergencyStop() ) );
624  }
625  else
626  {
627  XCEPT_RAISE(stor::exception::StateMachine,
628  "Received an unknown state machine event '" + command + "'.");
629  }
630 
631  errorMsg = "Failed to create FSM SOAP reply message: ";
632  returnMsg = soaputils::createFsmSoapResponseMsg(command,
633  sharedResources_->statisticsReporter_->
634  getStateMachineMonitorCollection().externallyVisibleState());
635  }
636  catch (cms::Exception& e) {
637  errorMsg += e.explainSelf();
638  XCEPT_DECLARE(xoap::exception::Exception,
639  sentinelException, errorMsg);
640  sharedResources_->moveToFailedState( sentinelException );
641  throw sentinelException;
642  }
643  catch (xcept::Exception &e) {
644  XCEPT_DECLARE_NESTED(xoap::exception::Exception,
645  sentinelException, errorMsg, e);
646  sharedResources_->moveToFailedState( sentinelException );
647  throw sentinelException;
648  }
649  catch (std::exception& e) {
650  errorMsg += e.what();
651  XCEPT_DECLARE(xoap::exception::Exception,
652  sentinelException, errorMsg);
653  sharedResources_->moveToFailedState( sentinelException );
654  throw sentinelException;
655  }
656  catch (...) {
657  errorMsg += "Unknown exception";
658  XCEPT_DECLARE(xoap::exception::Exception,
659  sentinelException, errorMsg);
660  sharedResources_->moveToFailedState( sentinelException );
661  throw sentinelException;
662  }
663 
664  return returnMsg;
665 }
virtual std::string explainSelf() const
Definition: Exception.cc:56
SharedResourcesPtr sharedResources_
boost::shared_ptr< boost::statechart::event_base > EventPtr_t
Definition: CommandQueue.h:21
xoap::MessageReference createFsmSoapResponseMsg(const std::string commandName, const std::string currentState)
Definition: SoapUtils.cc:38
std::string extractParameters(xoap::MessageReference, xdaq::Application *)
Definition: SoapUtils.cc:25
void StorageManager::initializeSharedResources ( )
private

Initialize the shared resources

Definition at line 150 of file StorageManager.cc.

References stor::QueueConfigurationParams::commandQueueSize_, edm::errors::Configuration, consumerUtils_, stor::QueueConfigurationParams::dqmEventQueueMemoryLimitMB_, stor::QueueConfigurationParams::dqmEventQueueSize_, stor::QueueConfigurationParams::fragmentQueueMemoryLimitMB_, stor::QueueConfigurationParams::fragmentQueueSize_, stor::QueueConfigurationParams::registrationQueueSize_, reset(), sharedResources_, smWebPageHelper_, stor::QueueConfigurationParams::streamQueueMemoryLimitMB_, and stor::QueueConfigurationParams::streamQueueSize_.

Referenced by StorageManager().

151 {
152  sharedResources_.reset(new SharedResources());
153 
154  xdata::InfoSpace *ispace = getApplicationInfoSpace();
155  unsigned long instance = getApplicationDescriptor()->getInstance();
156  sharedResources_->configuration_.reset(new Configuration(ispace, instance));
157 
158  QueueConfigurationParams queueParams =
159  sharedResources_->configuration_->getQueueConfigurationParams();
160  sharedResources_->commandQueue_.
161  reset(new CommandQueue(queueParams.commandQueueSize_));
162  sharedResources_->fragmentQueue_.
163  reset(new FragmentQueue(queueParams.fragmentQueueSize_, queueParams.fragmentQueueMemoryLimitMB_ * 1024*1024));
164  sharedResources_->registrationQueue_.
166  sharedResources_->streamQueue_.
167  reset(new StreamQueue(queueParams.streamQueueSize_, queueParams.streamQueueMemoryLimitMB_ * 1024*1024));
168  sharedResources_->dqmEventQueue_.
169  reset(new DQMEventQueue(queueParams.dqmEventQueueSize_, queueParams.dqmEventQueueMemoryLimitMB_ * 1024*1024));
170 
171  sharedResources_->statisticsReporter_.reset(
173  );
174  sharedResources_->initMsgCollection_.reset(new InitMsgCollection());
175  sharedResources_->diskWriterResources_.reset(new DiskWriterResources());
176  sharedResources_->dqmEventProcessorResources_.reset(new DQMEventProcessorResources());
177 
178  sharedResources_->statisticsReporter_->getThroughputMonitorCollection().setFragmentQueue(sharedResources_->fragmentQueue_);
179  sharedResources_->statisticsReporter_->getThroughputMonitorCollection().setStreamQueue(sharedResources_->streamQueue_);
180  sharedResources_->statisticsReporter_->getThroughputMonitorCollection().setDQMEventQueue(sharedResources_->dqmEventQueue_);
181 
183  discardManager_.reset(new DiscardManager(getApplicationContext(),
184  getApplicationDescriptor(),
185  sharedResources_->statisticsReporter_->
186  getDataSenderMonitorCollection()));
187 
188  sharedResources_->registrationCollection_.reset( new RegistrationCollection() );
190  sharedResources_->statisticsReporter_->getEventConsumerMonitorCollection();
191  sharedResources_->eventQueueCollection_.reset( new EventQueueCollection( ecmc ) );
192 
194  sharedResources_->statisticsReporter_->getDQMConsumerMonitorCollection();
195  sharedResources_->dqmEventQueueCollection_.reset( new DQMEventQueueCollection( dcmc ) );
196 
197  consumerUtils_.reset( new ConsumerUtils_t(
198  sharedResources_->configuration_,
199  sharedResources_->registrationCollection_,
200  sharedResources_->registrationQueue_,
201  sharedResources_->initMsgCollection_,
202  sharedResources_->eventQueueCollection_,
203  sharedResources_->dqmEventQueueCollection_,
204  sharedResources_->statisticsReporter_->alarmHandler()
205  ) );
206 
208  getApplicationDescriptor(), sharedResources_));
209 
210 }
unsigned int dqmEventQueueMemoryLimitMB_
Definition: Configuration.h:91
ConsumerUtils< Configuration, EventQueueCollection > ConsumerUtils_t
unsigned int fragmentQueueMemoryLimitMB_
Definition: Configuration.h:93
ConcurrentQueue< RegPtr > RegistrationQueue
unsigned int streamQueueMemoryLimitMB_
Definition: Configuration.h:96
SharedResourcesPtr sharedResources_
ConcurrentQueue< I2OChain, KeepNewest< I2OChain > > DQMEventQueue
Definition: DQMEventQueue.h:22
ConcurrentQueue< I2OChain > StreamQueue
Definition: StreamQueue.h:21
ConcurrentQueue< EventPtr_t > CommandQueue
Definition: CommandQueue.h:22
QueueCollection< I2OChain > EventQueueCollection
QueueCollection< DQMTopLevelFolder::Record > DQMEventQueueCollection
void reset(double vett[256])
Definition: TPedValues.cc:11
ConcurrentQueue< I2OChain > FragmentQueue
Definition: FragmentQueue.h:21
boost::scoped_ptr< ConsumerUtils_t > consumerUtils_
boost::scoped_ptr< SMWebPageHelper > smWebPageHelper_
StorageManager& stor::StorageManager::operator= ( StorageManager const &  )
private
void StorageManager::processConsumerEventRequest ( xgi::Input in,
xgi::Output out 
)
throw (xgi::exception::Exception
)
private

Callback handling event consumer event request

Definition at line 689 of file StorageManager.cc.

References recoMuon::in, and dbtoconf::out.

Referenced by bindConsumerCallbacks().

691 {
692  consumerUtils_->processConsumerEventRequest(in,out);
693 }
tuple out
Definition: dbtoconf.py:99
boost::scoped_ptr< ConsumerUtils_t > consumerUtils_
void StorageManager::processConsumerHeaderRequest ( xgi::Input in,
xgi::Output out 
)
throw (xgi::exception::Exception
)
private

Callback handling event consumer init message request

Definition at line 681 of file StorageManager.cc.

References recoMuon::in, and dbtoconf::out.

Referenced by bindConsumerCallbacks().

683 {
684  consumerUtils_->processConsumerHeaderRequest(in,out);
685 }
tuple out
Definition: dbtoconf.py:99
boost::scoped_ptr< ConsumerUtils_t > consumerUtils_
void StorageManager::processConsumerRegistrationRequest ( xgi::Input in,
xgi::Output out 
)
throw (xgi::exception::Exception
)
private

Callback handling event consumer registration request

Definition at line 673 of file StorageManager.cc.

References recoMuon::in, and dbtoconf::out.

Referenced by bindConsumerCallbacks().

675 {
676  consumerUtils_->processConsumerRegistrationRequest(in,out);
677 }
tuple out
Definition: dbtoconf.py:99
boost::scoped_ptr< ConsumerUtils_t > consumerUtils_
void StorageManager::processDQMConsumerEventRequest ( xgi::Input in,
xgi::Output out 
)
throw (xgi::exception::Exception
)
private

Callback handling DQM event consumer DQM event request

Definition at line 705 of file StorageManager.cc.

References recoMuon::in, and dbtoconf::out.

Referenced by bindConsumerCallbacks().

707 {
708  consumerUtils_->processDQMConsumerEventRequest(in,out);
709 }
tuple out
Definition: dbtoconf.py:99
boost::scoped_ptr< ConsumerUtils_t > consumerUtils_
void StorageManager::processDQMConsumerRegistrationRequest ( xgi::Input in,
xgi::Output out 
)
throw (xgi::exception::Exception
)
private

Callback handling DQM event consumer registration request

Definition at line 697 of file StorageManager.cc.

References recoMuon::in, and dbtoconf::out.

Referenced by bindConsumerCallbacks().

699 {
700  consumerUtils_->processDQMConsumerRegistrationRequest(in,out);
701 }
tuple out
Definition: dbtoconf.py:99
boost::scoped_ptr< ConsumerUtils_t > consumerUtils_
void StorageManager::rbsenderDetailWebPage ( xgi::Input in,
xgi::Output out 
)
throw (xgi::exception::Exception
)
private

Webinterface callback creating web page showing detailed information about the resource broker sending data.

Definition at line 453 of file StorageManager.cc.

References ExpressReco_HICollisions_FallBack::e, cmsCodeRules.cppFunctionSkipper::exception, edm::hlt::Exception, recoMuon::in, and dbtoconf::out.

Referenced by bindWebInterfaceCallbacks().

455 {
456  std::string errorMsg = "Failed to create the data sender webpage";
457 
458  try
459  {
460  long long localRBID = 0;
461  cgicc::Cgicc cgiWrapper(in);
462  cgicc::const_form_iterator updateRef = cgiWrapper.getElement("id");
463  if (updateRef != cgiWrapper.getElements().end())
464  {
465  std::string idString = updateRef->getValue();
466  localRBID = boost::lexical_cast<long long>(idString);
467  }
468 
469  smWebPageHelper_->resourceBrokerDetail(out, localRBID);
470  }
471  catch(std::exception &e)
472  {
473  errorMsg += ": ";
474  errorMsg += e.what();
475 
476  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
477  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
478  }
479  catch(...)
480  {
481  errorMsg += ": Unknown exception";
482 
483  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
484  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
485  }
486 
487 }
tuple out
Definition: dbtoconf.py:99
boost::scoped_ptr< SMWebPageHelper > smWebPageHelper_
void StorageManager::rbsenderWebPage ( xgi::Input in,
xgi::Output out 
)
throw (xgi::exception::Exception
)
private

Webinterface callback creating web page showing summary information about the resource broker sending data.

Definition at line 425 of file StorageManager.cc.

References ExpressReco_HICollisions_FallBack::e, cmsCodeRules.cppFunctionSkipper::exception, edm::hlt::Exception, and dbtoconf::out.

Referenced by bindWebInterfaceCallbacks().

427 {
428  std::string errorMsg = "Failed to create the data sender webpage";
429 
430  try
431  {
432  smWebPageHelper_->resourceBrokerOverview(out);
433  }
434  catch(std::exception &e)
435  {
436  errorMsg += ": ";
437  errorMsg += e.what();
438 
439  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
440  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
441  }
442  catch(...)
443  {
444  errorMsg += ": Unknown exception";
445 
446  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
447  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
448  }
449 
450 }
tuple out
Definition: dbtoconf.py:99
boost::scoped_ptr< SMWebPageHelper > smWebPageHelper_
void StorageManager::receiveDataMessage ( toolbox::mem::Reference *  ref)
private

Callback for I2O message containing an event

Definition at line 270 of file StorageManager.cc.

References stor::FragmentMonitorCollection::addEventFragmentSample(), csvReporter::r, rand(), sharedResources_, and stor::I2OChain::totalDataSize().

Referenced by bindI2OCallbacks().

271 {
272  I2OChain i2oChain(ref);
273 
274  FragmentMonitorCollection& fragMonCollection =
275  sharedResources_->statisticsReporter_->getFragmentMonitorCollection();
276  fragMonCollection.addEventFragmentSample( i2oChain.totalDataSize() );
277 
278  sharedResources_->fragmentQueue_->enqWait(i2oChain);
279 
280 #ifdef STOR_DEBUG_DUPLICATE_MESSAGES
281  double r = rand()/static_cast<double>(RAND_MAX);
282  if (r < 0.001)
283  {
284  LOG4CPLUS_INFO(this->getApplicationLogger(), "Simulating duplicated data message");
285  receiveDataMessage(ref->duplicate());
286  }
287 #endif
288 }
void receiveDataMessage(toolbox::mem::Reference *ref)
SharedResourcesPtr sharedResources_
Signal rand(Signal arg)
Definition: vlib.cc:442
void addEventFragmentSample(const double bytecount)
void StorageManager::receiveDQMMessage ( toolbox::mem::Reference *  ref)
private

Callback for I2O message containing a DQM event (histogramms)

Definition at line 303 of file StorageManager.cc.

References stor::FragmentMonitorCollection::addDQMEventFragmentSample(), sharedResources_, and stor::I2OChain::totalDataSize().

Referenced by bindI2OCallbacks().

304 {
305  I2OChain i2oChain(ref);
306 
307  FragmentMonitorCollection& fragMonCollection =
308  sharedResources_->statisticsReporter_->getFragmentMonitorCollection();
309  fragMonCollection.addDQMEventFragmentSample( i2oChain.totalDataSize() );
310 
311  sharedResources_->fragmentQueue_->enqWait(i2oChain);
312 }
SharedResourcesPtr sharedResources_
void addDQMEventFragmentSample(const double bytecount)
void StorageManager::receiveEndOfLumiSectionMessage ( toolbox::mem::Reference *  ref)
private

Callback for I2O message notifying the end-of-lumi-section

Definition at line 315 of file StorageManager.cc.

References stor::FragmentMonitorCollection::addFragmentSample(), stor::MonitoredQuantity::addSample(), stor::RunMonitorCollection::getEoLSSeenMQ(), stor::I2OChain::lumiSection(), sharedResources_, and stor::I2OChain::totalDataSize().

Referenced by bindI2OCallbacks().

316 {
317  I2OChain i2oChain( ref );
318 
319  FragmentMonitorCollection& fragMonCollection =
320  sharedResources_->statisticsReporter_->getFragmentMonitorCollection();
321  fragMonCollection.addFragmentSample( i2oChain.totalDataSize() );
322 
323  RunMonitorCollection& runMonCollection =
324  sharedResources_->statisticsReporter_->getRunMonitorCollection();
325  runMonCollection.getEoLSSeenMQ().addSample( i2oChain.lumiSection() );
326 
327  sharedResources_->streamQueue_->enqWait( i2oChain );
328 }
void addSample(const double &value=1)
SharedResourcesPtr sharedResources_
const MonitoredQuantity & getEoLSSeenMQ() const
void addFragmentSample(const double bytecount)
void StorageManager::receiveErrorDataMessage ( toolbox::mem::Reference *  ref)
private

Callback for I2O message containing an error event

Definition at line 291 of file StorageManager.cc.

References stor::FragmentMonitorCollection::addEventFragmentSample(), sharedResources_, and stor::I2OChain::totalDataSize().

Referenced by bindI2OCallbacks().

292 {
293  I2OChain i2oChain(ref);
294 
295  FragmentMonitorCollection& fragMonCollection =
296  sharedResources_->statisticsReporter_->getFragmentMonitorCollection();
297  fragMonCollection.addEventFragmentSample( i2oChain.totalDataSize() );
298 
299  sharedResources_->fragmentQueue_->enqWait(i2oChain);
300 }
SharedResourcesPtr sharedResources_
void addEventFragmentSample(const double bytecount)
void StorageManager::receiveRegistryMessage ( toolbox::mem::Reference *  ref)
private

Callback for I2O message containing an init message

Definition at line 251 of file StorageManager.cc.

References stor::MonitoredQuantity::addSample(), stor::FragmentMonitorCollection::getAllFragmentSizeMQ(), stor::ThroughputMonitorCollection::setMemoryPoolPointer(), sharedResources_, and stor::I2OChain::totalDataSize().

Referenced by bindI2OCallbacks().

252 {
253  I2OChain i2oChain(ref);
254 
255  // Set the I2O message pool pointer. Only done for init messages.
256  ThroughputMonitorCollection& throughputMonCollection =
257  sharedResources_->statisticsReporter_->getThroughputMonitorCollection();
258  throughputMonCollection.setMemoryPoolPointer( ref->getBuffer()->getPool() );
259 
260  FragmentMonitorCollection& fragMonCollection =
261  sharedResources_->statisticsReporter_->getFragmentMonitorCollection();
262  fragMonCollection.getAllFragmentSizeMQ().addSample(
263  static_cast<double>( i2oChain.totalDataSize() ) / 0x100000
264  );
265 
266  sharedResources_->fragmentQueue_->enqWait(i2oChain);
267 }
const MonitoredQuantity & getAllFragmentSizeMQ() const
void addSample(const double &value=1)
SharedResourcesPtr sharedResources_
void setMemoryPoolPointer(toolbox::mem::Pool *)
void StorageManager::startWorkerThreads ( )
private

Create and start all worker threads

Definition at line 213 of file StorageManager.cc.

References diskWriter_, dqmEventProcessor_, ExpressReco_HICollisions_FallBack::e, cmsCodeRules.cppFunctionSkipper::exception, edm::hlt::Exception, fragmentProcessor_, and sharedResources_.

Referenced by StorageManager().

214 {
215 
216  // Start the workloops
217  try
218  {
220  diskWriter_.reset( new DiskWriter(this, sharedResources_) );
222  sharedResources_->statisticsReporter_->startWorkLoop("theStatisticsReporter");
223  fragmentProcessor_->startWorkLoop("theFragmentProcessor");
224  diskWriter_->startWorkLoop("theDiskWriter");
225  dqmEventProcessor_->startWorkLoop("theDQMEventProcessor");
226  }
227  catch(xcept::Exception &e)
228  {
229  sharedResources_->moveToFailedState( e );
230  }
231  catch(std::exception &e)
232  {
233  XCEPT_DECLARE(stor::exception::Exception,
234  sentinelException, e.what());
235  sharedResources_->moveToFailedState( sentinelException );
236  }
237  catch(...)
238  {
239  std::string errorMsg = "Unknown exception when starting the workloops";
240  XCEPT_DECLARE(stor::exception::Exception,
241  sentinelException, errorMsg);
242  sharedResources_->moveToFailedState( sentinelException );
243  }
244 }
boost::scoped_ptr< DQMEventProcessor > dqmEventProcessor_
boost::scoped_ptr< DiskWriter > diskWriter_
SharedResourcesPtr sharedResources_
boost::scoped_ptr< FragmentProcessor > fragmentProcessor_
void StorageManager::storedDataWebPage ( xgi::Input in,
xgi::Output out 
)
throw (xgi::exception::Exception
)
private

Webinterface callback creating web page showing the stored data information

Definition at line 369 of file StorageManager.cc.

References ExpressReco_HICollisions_FallBack::e, cmsCodeRules.cppFunctionSkipper::exception, edm::hlt::Exception, and dbtoconf::out.

Referenced by bindWebInterfaceCallbacks().

371 {
372  std::string errorMsg = "Failed to create the stored data webpage";
373 
374  try
375  {
376  smWebPageHelper_->storedDataWebPage(out);
377  }
378  catch(std::exception &e)
379  {
380  errorMsg += ": ";
381  errorMsg += e.what();
382 
383  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
384  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
385  }
386  catch(...)
387  {
388  errorMsg += ": Unknown exception";
389 
390  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
391  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
392  }
393 }
tuple out
Definition: dbtoconf.py:99
boost::scoped_ptr< SMWebPageHelper > smWebPageHelper_
void StorageManager::throughputWebPage ( xgi::Input in,
xgi::Output out 
)
throw (xgi::exception::Exception
)
private

Webinterface callback creating web page showing statistics about the data throughput in the SM.

Definition at line 545 of file StorageManager.cc.

References ExpressReco_HICollisions_FallBack::e, cmsCodeRules.cppFunctionSkipper::exception, edm::hlt::Exception, and dbtoconf::out.

Referenced by bindWebInterfaceCallbacks().

547 {
548  std::string errorMsg = "Failed to create the throughput statistics webpage";
549 
550  try
551  {
552  smWebPageHelper_->throughputWebPage(out);
553  }
554  catch(std::exception &e)
555  {
556  errorMsg += ": ";
557  errorMsg += e.what();
558 
559  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
560  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
561  }
562  catch(...)
563  {
564  errorMsg += ": Unknown exception";
565 
566  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
567  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
568  }
569 
570 }
tuple out
Definition: dbtoconf.py:99
boost::scoped_ptr< SMWebPageHelper > smWebPageHelper_

Member Data Documentation

boost::scoped_ptr<ConsumerUtils_t> stor::StorageManager::consumerUtils_
private

Definition at line 223 of file StorageManager.h.

Referenced by initializeSharedResources().

boost::scoped_ptr<DiskWriter> stor::StorageManager::diskWriter_
private

Definition at line 219 of file StorageManager.h.

Referenced by startWorkerThreads().

boost::scoped_ptr<DQMEventProcessor> stor::StorageManager::dqmEventProcessor_
private

Definition at line 220 of file StorageManager.h.

Referenced by startWorkerThreads().

boost::scoped_ptr<FragmentProcessor> stor::StorageManager::fragmentProcessor_
private

Definition at line 218 of file StorageManager.h.

Referenced by startWorkerThreads().

SharedResourcesPtr stor::StorageManager::sharedResources_
private
boost::scoped_ptr<SMWebPageHelper> stor::StorageManager::smWebPageHelper_
private

Definition at line 224 of file StorageManager.h.

Referenced by initializeSharedResources().