CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
List of all members | Public Member Functions | Private Types | Private Member Functions | Private Attributes
stor::StorageManager Class Reference

#include <StorageManager.h>

Inheritance diagram for stor::StorageManager:

Public Member Functions

 StorageManager (xdaq::ApplicationStub *s)
 

Private Types

typedef ConsumerUtils
< Configuration,
EventQueueCollection
ConsumerUtils_t
 

Private Member Functions

void bindConsumerCallbacks ()
 
void bindI2OCallbacks ()
 
void bindStateMachineCallbacks ()
 
void bindWebInterfaceCallbacks ()
 
void consumerListWebPage (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception)
 
void consumerStatisticsPage (xgi::Input *in, xgi::Output *out) throw ( xgi::exception::Exception )
 
void css (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception)
 
void defaultWebPage (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception)
 
void dqmEventStatisticsWebPage (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception)
 
void fileStatisticsWebPage (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception)
 
xoap::MessageReference handleFSMSoapMessage (xoap::MessageReference) throw ( xoap::exception::Exception )
 
void initializeSharedResources ()
 
void inputWebPage (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception)
 
StorageManageroperator= (StorageManager const &)
 
void processConsumerEventRequest (xgi::Input *in, xgi::Output *out) throw ( xgi::exception::Exception )
 
void processConsumerHeaderRequest (xgi::Input *in, xgi::Output *out) throw ( xgi::exception::Exception )
 
void processConsumerRegistrationRequest (xgi::Input *in, xgi::Output *out) throw ( xgi::exception::Exception )
 
void processDQMConsumerEventRequest (xgi::Input *in, xgi::Output *out) throw ( xgi::exception::Exception )
 
void processDQMConsumerRegistrationRequest (xgi::Input *in, xgi::Output *out) throw ( xgi::exception::Exception )
 
void rbsenderDetailWebPage (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception)
 
void rbsenderWebPage (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception)
 
void receiveDataMessage (toolbox::mem::Reference *ref)
 
void receiveDQMMessage (toolbox::mem::Reference *ref)
 
void receiveEndOfLumiSectionMessage (toolbox::mem::Reference *ref)
 
void receiveErrorDataMessage (toolbox::mem::Reference *ref)
 
void receiveRegistryMessage (toolbox::mem::Reference *ref)
 
void startWorkerThreads ()
 
 StorageManager (StorageManager const &)
 
void storedDataWebPage (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception)
 
void throughputWebPage (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception)
 

Private Attributes

boost::scoped_ptr
< ConsumerUtils_t
consumerUtils_
 
boost::scoped_ptr< DiskWriterdiskWriter_
 
boost::scoped_ptr
< DQMEventProcessor
dqmEventProcessor_
 
boost::scoped_ptr
< FragmentProcessor
fragmentProcessor_
 
SharedResourcesPtr sharedResources_
 
boost::scoped_ptr
< SMWebPageHelper
smWebPageHelper_
 

Detailed Description

Main class of the StorageManager XDAQ application

Author:
mommsen
Revision:
1.61
Date:
2011/11/17 17:35:40

Definition at line 46 of file StorageManager.h.

Member Typedef Documentation

Definition at line 230 of file StorageManager.h.

Constructor & Destructor Documentation

StorageManager::StorageManager ( xdaq::ApplicationStub *  s)

Definition at line 39 of file StorageManager.cc.

References bindConsumerCallbacks(), bindI2OCallbacks(), bindStateMachineCallbacks(), bindWebInterfaceCallbacks(), alignCSCRings::e, cppFunctionSkipper::exception, edm::hlt::Exception, initializeSharedResources(), and startWorkerThreads().

39  :
40  xdaq::Application(s)
41 {
42  LOG4CPLUS_INFO(this->getApplicationLogger(),"Making StorageManager");
43 
44  // bind all callback functions
49 
50  std::string errorMsg = "Exception in StorageManager constructor: ";
51  try
52  {
54  }
55  catch(std::exception &e)
56  {
57  errorMsg += e.what();
58  LOG4CPLUS_FATAL( getApplicationLogger(), e.what() );
59  XCEPT_RAISE( stor::exception::Exception, e.what() );
60  }
61  catch(...)
62  {
63  errorMsg += "unknown exception";
64  LOG4CPLUS_FATAL( getApplicationLogger(), errorMsg );
65  XCEPT_RAISE( stor::exception::Exception, errorMsg );
66  }
67 
69 }
stor::StorageManager::StorageManager ( StorageManager const &  )
private

Member Function Documentation

void StorageManager::bindConsumerCallbacks ( )
private

Bind callbacks for consumers

Definition at line 138 of file StorageManager.cc.

References processConsumerEventRequest(), processConsumerHeaderRequest(), processConsumerRegistrationRequest(), processDQMConsumerEventRequest(), and processDQMConsumerRegistrationRequest().

Referenced by StorageManager().

139 {
140  // event consumers
141  xgi::bind( this, &StorageManager::processConsumerRegistrationRequest, "registerConsumer" );
142  xgi::bind( this, &StorageManager::processConsumerHeaderRequest, "getregdata" );
143  xgi::bind( this, &StorageManager::processConsumerEventRequest, "geteventdata" );
144 
145  // dqm event consumers
146  xgi::bind(this,&StorageManager::processDQMConsumerRegistrationRequest, "registerDQMConsumer");
147  xgi::bind(this,&StorageManager::processDQMConsumerEventRequest, "getDQMeventdata");
148 }
void processConsumerRegistrationRequest(xgi::Input *in, xgi::Output *out)
void processConsumerHeaderRequest(xgi::Input *in, xgi::Output *out)
void processDQMConsumerEventRequest(xgi::Input *in, xgi::Output *out)
void processDQMConsumerRegistrationRequest(xgi::Input *in, xgi::Output *out)
void processConsumerEventRequest(xgi::Input *in, xgi::Output *out)
void StorageManager::bindI2OCallbacks ( )
private

Bind callbacks for I2O message

Definition at line 72 of file StorageManager.cc.

References I2O_SM_DATA, I2O_SM_DQM, I2O_SM_ERROR, I2O_SM_PREAMBLE, receiveDataMessage(), receiveDQMMessage(), receiveEndOfLumiSectionMessage(), receiveErrorDataMessage(), and receiveRegistryMessage().

Referenced by StorageManager().

73 {
74  i2o::bind(this,
77  XDAQ_ORGANIZATION_ID);
78  i2o::bind(this,
81  XDAQ_ORGANIZATION_ID);
82  i2o::bind(this,
85  XDAQ_ORGANIZATION_ID);
86  i2o::bind(this,
88  I2O_SM_DQM,
89  XDAQ_ORGANIZATION_ID);
90  i2o::bind(this,
92  I2O_EVM_LUMISECTION,
93  XDAQ_ORGANIZATION_ID);
94 }
#define I2O_SM_ERROR
Definition: i2oEvfMsgs.h:23
#define I2O_SM_DQM
Definition: i2oEvfMsgs.h:25
void receiveEndOfLumiSectionMessage(toolbox::mem::Reference *ref)
#define I2O_SM_PREAMBLE
Definition: i2oEvfMsgs.h:21
void receiveDataMessage(toolbox::mem::Reference *ref)
void receiveErrorDataMessage(toolbox::mem::Reference *ref)
void receiveRegistryMessage(toolbox::mem::Reference *ref)
#define I2O_SM_DATA
Definition: i2oEvfMsgs.h:22
void receiveDQMMessage(toolbox::mem::Reference *ref)
void StorageManager::bindStateMachineCallbacks ( )
private

Bind callbacks for state machine SOAP messages

Definition at line 97 of file StorageManager.cc.

References handleFSMSoapMessage().

Referenced by StorageManager().

98 {
99  xoap::bind( this,
101  "Configure",
102  XDAQ_NS_URI );
103  xoap::bind( this,
105  "Enable",
106  XDAQ_NS_URI );
107  xoap::bind( this,
109  "Stop",
110  XDAQ_NS_URI );
111  xoap::bind( this,
113  "Halt",
114  XDAQ_NS_URI );
115  xoap::bind( this,
117  "EmergencyStop",
118  XDAQ_NS_URI );
119 }
xoap::MessageReference handleFSMSoapMessage(xoap::MessageReference)
void StorageManager::bindWebInterfaceCallbacks ( )
private

Bind callbacks for web interface

Definition at line 122 of file StorageManager.cc.

References consumerListWebPage(), consumerStatisticsPage(), css(), defaultWebPage(), dqmEventStatisticsWebPage(), fileStatisticsWebPage(), inputWebPage(), rbsenderDetailWebPage(), rbsenderWebPage(), storedDataWebPage(), and throughputWebPage().

Referenced by StorageManager().

123 {
124  xgi::bind(this,&StorageManager::css, "styles.css");
125  xgi::bind(this,&StorageManager::defaultWebPage, "Default");
126  xgi::bind(this,&StorageManager::inputWebPage, "input");
127  xgi::bind(this,&StorageManager::storedDataWebPage, "storedData");
128  xgi::bind(this,&StorageManager::rbsenderWebPage, "rbsenderlist");
129  xgi::bind(this,&StorageManager::rbsenderDetailWebPage, "rbsenderdetail");
130  xgi::bind(this,&StorageManager::fileStatisticsWebPage, "fileStatistics");
131  xgi::bind(this,&StorageManager::dqmEventStatisticsWebPage,"dqmEventStatistics");
132  xgi::bind(this,&StorageManager::consumerStatisticsPage, "consumerStatistics" );
133  xgi::bind(this,&StorageManager::consumerListWebPage, "consumerList");
134  xgi::bind(this,&StorageManager::throughputWebPage, "throughputStatistics");
135 }
void inputWebPage(xgi::Input *in, xgi::Output *out)
void consumerStatisticsPage(xgi::Input *in, xgi::Output *out)
void consumerListWebPage(xgi::Input *in, xgi::Output *out)
void fileStatisticsWebPage(xgi::Input *in, xgi::Output *out)
void rbsenderDetailWebPage(xgi::Input *in, xgi::Output *out)
void storedDataWebPage(xgi::Input *in, xgi::Output *out)
void css(xgi::Input *in, xgi::Output *out)
void dqmEventStatisticsWebPage(xgi::Input *in, xgi::Output *out)
void rbsenderWebPage(xgi::Input *in, xgi::Output *out)
void defaultWebPage(xgi::Input *in, xgi::Output *out)
void throughputWebPage(xgi::Input *in, xgi::Output *out)
void StorageManager::consumerListWebPage ( xgi::Input in,
xgi::Output out 
)
throw (xgi::exception::Exception
)
private

Callback returning a XML list of consumer information. The current implementation just returns an empty document.

Definition at line 603 of file StorageManager.cc.

References dbtoconf::out.

Referenced by bindWebInterfaceCallbacks().

605 {
606  out->getHTTPResponseHeader().addHeader( "Content-Type",
607  "application/octet-stream" );
608  out->getHTTPResponseHeader().addHeader( "Content-Transfer-Encoding",
609  "binary" );
610  char buff;
611  out->write( &buff, 0 );
612 }
tuple out
Definition: dbtoconf.py:99
void StorageManager::consumerStatisticsPage ( xgi::Input in,
xgi::Output out 
)
throw (xgi::exception::Exception
)
private

Webinterface callback creating web page showing the connected consumers

Definition at line 425 of file StorageManager.cc.

References alignCSCRings::e, cppFunctionSkipper::exception, edm::hlt::Exception, and dbtoconf::out.

Referenced by bindWebInterfaceCallbacks().

428 {
429 
430  std::string err_msg =
431  "Failed to create consumer statistics page";
432 
433  try
434  {
435  smWebPageHelper_->consumerStatistics(out);
436  }
437  catch( std::exception &e )
438  {
439  err_msg += ": ";
440  err_msg += e.what();
441  LOG4CPLUS_ERROR( getApplicationLogger(), err_msg );
442  XCEPT_RAISE( xgi::exception::Exception, err_msg );
443  }
444  catch(...)
445  {
446  err_msg += ": Unknown exception";
447  LOG4CPLUS_ERROR( getApplicationLogger(), err_msg );
448  XCEPT_RAISE( xgi::exception::Exception, err_msg );
449  }
450 
451 }
tuple out
Definition: dbtoconf.py:99
boost::scoped_ptr< SMWebPageHelper > smWebPageHelper_
void StorageManager::css ( xgi::Input in,
xgi::Output out 
)
throw (xgi::exception::Exception
)
private

Webinterface callback for style sheet

Definition at line 337 of file StorageManager.cc.

References recoMuon::in, and dbtoconf::out.

Referenced by bindWebInterfaceCallbacks().

339 {
340  smWebPageHelper_->css(in,out);
341 }
tuple out
Definition: dbtoconf.py:99
boost::scoped_ptr< SMWebPageHelper > smWebPageHelper_
void StorageManager::defaultWebPage ( xgi::Input in,
xgi::Output out 
)
throw (xgi::exception::Exception
)
private

Webinterface callback creating default web page

Definition at line 344 of file StorageManager.cc.

References alignCSCRings::e, cppFunctionSkipper::exception, edm::hlt::Exception, and dbtoconf::out.

Referenced by bindWebInterfaceCallbacks().

346 {
347  std::string errorMsg = "Failed to create the default webpage";
348 
349  try
350  {
351  smWebPageHelper_->defaultWebPage(out);
352  }
353  catch(std::exception &e)
354  {
355  errorMsg += ": ";
356  errorMsg += e.what();
357 
358  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
359  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
360  }
361  catch(...)
362  {
363  errorMsg += ": Unknown exception";
364 
365  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
366  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
367  }
368 }
tuple out
Definition: dbtoconf.py:99
boost::scoped_ptr< SMWebPageHelper > smWebPageHelper_
void StorageManager::dqmEventStatisticsWebPage ( xgi::Input in,
xgi::Output out 
)
throw (xgi::exception::Exception
)
private

Webinterface callback creating web page showing statistics about the processed DQM events.

Definition at line 547 of file StorageManager.cc.

References alignCSCRings::e, cppFunctionSkipper::exception, edm::hlt::Exception, and dbtoconf::out.

Referenced by bindWebInterfaceCallbacks().

549 {
550  std::string errorMsg = "Failed to create the DQM event statistics webpage";
551 
552  try
553  {
554  smWebPageHelper_->dqmEventWebPage(out);
555  }
556  catch(std::exception &e)
557  {
558  errorMsg += ": ";
559  errorMsg += e.what();
560 
561  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
562  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
563  }
564  catch(...)
565  {
566  errorMsg += ": Unknown exception";
567 
568  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
569  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
570  }
571 }
tuple out
Definition: dbtoconf.py:99
boost::scoped_ptr< SMWebPageHelper > smWebPageHelper_
void StorageManager::fileStatisticsWebPage ( xgi::Input in,
xgi::Output out 
)
throw (xgi::exception::Exception
)
private

Webinterface callback creating web page showing information about recently written files

Definition at line 519 of file StorageManager.cc.

References alignCSCRings::e, cppFunctionSkipper::exception, edm::hlt::Exception, and dbtoconf::out.

Referenced by bindWebInterfaceCallbacks().

521 {
522  std::string errorMsg = "Failed to create the file statistics webpage";
523 
524  try
525  {
526  smWebPageHelper_->filesWebPage(out);
527  }
528  catch(std::exception &e)
529  {
530  errorMsg += ": ";
531  errorMsg += e.what();
532 
533  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
534  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
535  }
536  catch(...)
537  {
538  errorMsg += ": Unknown exception";
539 
540  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
541  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
542  }
543 
544 }
tuple out
Definition: dbtoconf.py:99
boost::scoped_ptr< SMWebPageHelper > smWebPageHelper_
xoap::MessageReference StorageManager::handleFSMSoapMessage ( xoap::MessageReference  msg)
throw (xoap::exception::Exception
)
private

Callback for SOAP message containint a state machine event, possibly including new configuration values

Definition at line 619 of file StorageManager.cc.

References edmPickEvents::command, stor::soaputils::createFsmSoapResponseMsg(), alignCSCRings::e, cppFunctionSkipper::exception, edm::hlt::Exception, cms::Exception::explainSelf(), stor::soaputils::extractParameters(), and lumiQueryAPI::msg.

Referenced by bindStateMachineCallbacks().

621 {
622  std::string errorMsg;
623  xoap::MessageReference returnMsg;
624 
625  try {
626  errorMsg = "Failed to extract FSM event and parameters from SOAP message: ";
627  std::string command = soaputils::extractParameters(msg, this);
628 
629  errorMsg = "Failed to put a '" + command + "' state machine event into command queue: ";
630  if (command == "Configure")
631  {
632  sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::Configure() ) );
633  }
634  else if (command == "Enable")
635  {
636  if (sharedResources_->configuration_->streamConfigurationHasChanged())
637  {
638  sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::Reconfigure() ) );
639  }
640  sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::Enable() ) );
641  }
642  else if (command == "Stop")
643  {
644  sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::Stop() ) );
645  }
646  else if (command == "Halt")
647  {
648  sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::Halt() ) );
649  }
650  else if (command == "EmergencyStop")
651  {
652  sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::EmergencyStop() ) );
653  }
654  else
655  {
656  XCEPT_RAISE(stor::exception::StateMachine,
657  "Received an unknown state machine event '" + command + "'.");
658  }
659 
660  errorMsg = "Failed to create FSM SOAP reply message: ";
661  returnMsg = soaputils::createFsmSoapResponseMsg(command,
662  sharedResources_->statisticsReporter_->
663  getStateMachineMonitorCollection().externallyVisibleState());
664  }
665  catch (cms::Exception& e) {
666  errorMsg += e.explainSelf();
667  XCEPT_DECLARE(xoap::exception::Exception,
668  sentinelException, errorMsg);
669  sharedResources_->alarmHandler_->moveToFailedState( sentinelException );
670  throw sentinelException;
671  }
672  catch (xcept::Exception &e) {
673  XCEPT_DECLARE_NESTED(xoap::exception::Exception,
674  sentinelException, errorMsg, e);
675  sharedResources_->alarmHandler_->moveToFailedState( sentinelException );
676  throw sentinelException;
677  }
678  catch (std::exception& e) {
679  errorMsg += e.what();
680  XCEPT_DECLARE(xoap::exception::Exception,
681  sentinelException, errorMsg);
682  sharedResources_->alarmHandler_->moveToFailedState( sentinelException );
683  throw sentinelException;
684  }
685  catch (...) {
686  errorMsg += "Unknown exception";
687  XCEPT_DECLARE(xoap::exception::Exception,
688  sentinelException, errorMsg);
689  sharedResources_->alarmHandler_->moveToFailedState( sentinelException );
690  throw sentinelException;
691  }
692 
693  return returnMsg;
694 }
virtual std::string explainSelf() const
Definition: Exception.cc:146
SharedResourcesPtr sharedResources_
boost::shared_ptr< boost::statechart::event_base > EventPtr_t
Definition: CommandQueue.h:21
xoap::MessageReference createFsmSoapResponseMsg(const std::string commandName, const std::string currentState)
Definition: SoapUtils.cc:38
std::string extractParameters(xoap::MessageReference, xdaq::Application *)
Definition: SoapUtils.cc:25
void StorageManager::initializeSharedResources ( )
private

Initialize the shared resources

Definition at line 151 of file StorageManager.cc.

References stor::QueueConfigurationParams::commandQueueSize_, edm::errors::Configuration, consumerUtils_, stor::QueueConfigurationParams::dqmEventQueueMemoryLimitMB_, stor::QueueConfigurationParams::dqmEventQueueSize_, stor::QueueConfigurationParams::fragmentQueueMemoryLimitMB_, stor::QueueConfigurationParams::fragmentQueueSize_, instance, stor::QueueConfigurationParams::registrationQueueSize_, reset(), sharedResources_, smWebPageHelper_, stor::QueueConfigurationParams::streamQueueMemoryLimitMB_, and stor::QueueConfigurationParams::streamQueueSize_.

Referenced by StorageManager().

152 {
153  sharedResources_.reset(new SharedResources());
154 
155  xdata::InfoSpace *ispace = getApplicationInfoSpace();
156  unsigned long instance = getApplicationDescriptor()->getInstance();
157  sharedResources_->configuration_.reset(new Configuration(ispace, instance));
158 
159  QueueConfigurationParams queueParams =
160  sharedResources_->configuration_->getQueueConfigurationParams();
161  sharedResources_->commandQueue_.
162  reset(new CommandQueue(queueParams.commandQueueSize_));
163  sharedResources_->fragmentQueue_.
164  reset(new FragmentQueue(queueParams.fragmentQueueSize_, queueParams.fragmentQueueMemoryLimitMB_ * 1024*1024));
165  sharedResources_->registrationQueue_.
167  sharedResources_->streamQueue_.
168  reset(new StreamQueue(queueParams.streamQueueSize_, queueParams.streamQueueMemoryLimitMB_ * 1024*1024));
169  sharedResources_->dqmEventQueue_.
170  reset(new DQMEventQueue(queueParams.dqmEventQueueSize_, queueParams.dqmEventQueueMemoryLimitMB_ * 1024*1024));
171 
172  sharedResources_->alarmHandler_.reset( new AlarmHandler(this, sharedResources_) );
173  sharedResources_->statisticsReporter_.reset(
175  );
176  sharedResources_->initMsgCollection_.reset(new InitMsgCollection());
177  sharedResources_->diskWriterResources_.reset(new DiskWriterResources());
178  sharedResources_->dqmEventProcessorResources_.reset(new DQMEventProcessorResources());
179 
180  sharedResources_->statisticsReporter_->getThroughputMonitorCollection().setFragmentQueue(sharedResources_->fragmentQueue_);
181  sharedResources_->statisticsReporter_->getThroughputMonitorCollection().setStreamQueue(sharedResources_->streamQueue_);
182  sharedResources_->statisticsReporter_->getThroughputMonitorCollection().setDQMEventQueue(sharedResources_->dqmEventQueue_);
183 
185  discardManager_.reset(new DiscardManager(getApplicationContext(),
186  getApplicationDescriptor(),
187  sharedResources_->statisticsReporter_->
188  getDataSenderMonitorCollection()));
189 
190  sharedResources_->registrationCollection_.reset( new RegistrationCollection() );
192  sharedResources_->statisticsReporter_->getEventConsumerMonitorCollection();
193  sharedResources_->eventQueueCollection_.reset( new EventQueueCollection( ecmc ) );
194 
196  sharedResources_->statisticsReporter_->getDQMConsumerMonitorCollection();
197  sharedResources_->dqmEventQueueCollection_.reset( new DQMEventQueueCollection( dcmc ) );
198 
199  consumerUtils_.reset( new ConsumerUtils_t(
200  sharedResources_->configuration_,
201  sharedResources_->registrationCollection_,
202  sharedResources_->registrationQueue_,
203  sharedResources_->initMsgCollection_,
204  sharedResources_->eventQueueCollection_,
205  sharedResources_->dqmEventQueueCollection_,
206  sharedResources_->alarmHandler_
207  ) );
208 
210  getApplicationDescriptor(), sharedResources_));
211 
212 }
unsigned int dqmEventQueueMemoryLimitMB_
Definition: Configuration.h:91
static PFTauRenderPlugin instance
ConsumerUtils< Configuration, EventQueueCollection > ConsumerUtils_t
unsigned int fragmentQueueMemoryLimitMB_
Definition: Configuration.h:93
ConcurrentQueue< RegPtr > RegistrationQueue
unsigned int streamQueueMemoryLimitMB_
Definition: Configuration.h:96
SharedResourcesPtr sharedResources_
ConcurrentQueue< I2OChain, KeepNewest< I2OChain > > DQMEventQueue
Definition: DQMEventQueue.h:22
ConcurrentQueue< I2OChain > StreamQueue
Definition: StreamQueue.h:21
ConcurrentQueue< EventPtr_t > CommandQueue
Definition: CommandQueue.h:22
QueueCollection< I2OChain > EventQueueCollection
QueueCollection< DQMTopLevelFolder::Record > DQMEventQueueCollection
void reset(double vett[256])
Definition: TPedValues.cc:11
ConcurrentQueue< I2OChain > FragmentQueue
Definition: FragmentQueue.h:21
boost::scoped_ptr< ConsumerUtils_t > consumerUtils_
boost::scoped_ptr< SMWebPageHelper > smWebPageHelper_
void StorageManager::inputWebPage ( xgi::Input in,
xgi::Output out 
)
throw (xgi::exception::Exception
)
private

Webinterface callback creating web page showing the I2O input information

Definition at line 371 of file StorageManager.cc.

References alignCSCRings::e, cppFunctionSkipper::exception, edm::hlt::Exception, and dbtoconf::out.

Referenced by bindWebInterfaceCallbacks().

373 {
374  std::string errorMsg = "Failed to create the I2O input webpage";
375 
376  try
377  {
378  smWebPageHelper_->inputWebPage(out);
379  }
380  catch(std::exception &e)
381  {
382  errorMsg += ": ";
383  errorMsg += e.what();
384 
385  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
386  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
387  }
388  catch(...)
389  {
390  errorMsg += ": Unknown exception";
391 
392  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
393  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
394  }
395 }
tuple out
Definition: dbtoconf.py:99
boost::scoped_ptr< SMWebPageHelper > smWebPageHelper_
StorageManager& stor::StorageManager::operator= ( StorageManager const &  )
private
void StorageManager::processConsumerEventRequest ( xgi::Input in,
xgi::Output out 
)
throw (xgi::exception::Exception
)
private

Callback handling event consumer event request

Definition at line 718 of file StorageManager.cc.

References recoMuon::in, and dbtoconf::out.

Referenced by bindConsumerCallbacks().

720 {
721  consumerUtils_->processConsumerEventRequest(in,out);
722 }
tuple out
Definition: dbtoconf.py:99
boost::scoped_ptr< ConsumerUtils_t > consumerUtils_
void StorageManager::processConsumerHeaderRequest ( xgi::Input in,
xgi::Output out 
)
throw (xgi::exception::Exception
)
private

Callback handling event consumer init message request

Definition at line 710 of file StorageManager.cc.

References recoMuon::in, and dbtoconf::out.

Referenced by bindConsumerCallbacks().

712 {
713  consumerUtils_->processConsumerHeaderRequest(in,out);
714 }
tuple out
Definition: dbtoconf.py:99
boost::scoped_ptr< ConsumerUtils_t > consumerUtils_
void StorageManager::processConsumerRegistrationRequest ( xgi::Input in,
xgi::Output out 
)
throw (xgi::exception::Exception
)
private

Callback handling event consumer registration request

Definition at line 702 of file StorageManager.cc.

References recoMuon::in, and dbtoconf::out.

Referenced by bindConsumerCallbacks().

704 {
705  consumerUtils_->processConsumerRegistrationRequest(in,out);
706 }
tuple out
Definition: dbtoconf.py:99
boost::scoped_ptr< ConsumerUtils_t > consumerUtils_
void StorageManager::processDQMConsumerEventRequest ( xgi::Input in,
xgi::Output out 
)
throw (xgi::exception::Exception
)
private

Callback handling DQM event consumer DQM event request

Definition at line 734 of file StorageManager.cc.

References recoMuon::in, and dbtoconf::out.

Referenced by bindConsumerCallbacks().

736 {
737  consumerUtils_->processDQMConsumerEventRequest(in,out);
738 }
tuple out
Definition: dbtoconf.py:99
boost::scoped_ptr< ConsumerUtils_t > consumerUtils_
void StorageManager::processDQMConsumerRegistrationRequest ( xgi::Input in,
xgi::Output out 
)
throw (xgi::exception::Exception
)
private

Callback handling DQM event consumer registration request

Definition at line 726 of file StorageManager.cc.

References recoMuon::in, and dbtoconf::out.

Referenced by bindConsumerCallbacks().

728 {
729  consumerUtils_->processDQMConsumerRegistrationRequest(in,out);
730 }
tuple out
Definition: dbtoconf.py:99
boost::scoped_ptr< ConsumerUtils_t > consumerUtils_
void StorageManager::rbsenderDetailWebPage ( xgi::Input in,
xgi::Output out 
)
throw (xgi::exception::Exception
)
private

Webinterface callback creating web page showing detailed information about the resource broker sending data.

Definition at line 482 of file StorageManager.cc.

References alignCSCRings::e, cppFunctionSkipper::exception, edm::hlt::Exception, recoMuon::in, and dbtoconf::out.

Referenced by bindWebInterfaceCallbacks().

484 {
485  std::string errorMsg = "Failed to create the data sender webpage";
486 
487  try
488  {
489  long long localRBID = 0;
490  cgicc::Cgicc cgiWrapper(in);
491  cgicc::const_form_iterator updateRef = cgiWrapper.getElement("id");
492  if (updateRef != cgiWrapper.getElements().end())
493  {
494  std::string idString = updateRef->getValue();
495  localRBID = boost::lexical_cast<long long>(idString);
496  }
497 
498  smWebPageHelper_->resourceBrokerDetail(out, localRBID);
499  }
500  catch(std::exception &e)
501  {
502  errorMsg += ": ";
503  errorMsg += e.what();
504 
505  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
506  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
507  }
508  catch(...)
509  {
510  errorMsg += ": Unknown exception";
511 
512  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
513  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
514  }
515 
516 }
tuple out
Definition: dbtoconf.py:99
boost::scoped_ptr< SMWebPageHelper > smWebPageHelper_
void StorageManager::rbsenderWebPage ( xgi::Input in,
xgi::Output out 
)
throw (xgi::exception::Exception
)
private

Webinterface callback creating web page showing summary information about the resource broker sending data.

Definition at line 454 of file StorageManager.cc.

References alignCSCRings::e, cppFunctionSkipper::exception, edm::hlt::Exception, and dbtoconf::out.

Referenced by bindWebInterfaceCallbacks().

456 {
457  std::string errorMsg = "Failed to create the data sender webpage";
458 
459  try
460  {
461  smWebPageHelper_->resourceBrokerOverview(out);
462  }
463  catch(std::exception &e)
464  {
465  errorMsg += ": ";
466  errorMsg += e.what();
467 
468  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
469  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
470  }
471  catch(...)
472  {
473  errorMsg += ": Unknown exception";
474 
475  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
476  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
477  }
478 
479 }
tuple out
Definition: dbtoconf.py:99
boost::scoped_ptr< SMWebPageHelper > smWebPageHelper_
void StorageManager::receiveDataMessage ( toolbox::mem::Reference *  ref)
private

Callback for I2O message containing an event

Definition at line 272 of file StorageManager.cc.

References stor::FragmentMonitorCollection::addEventFragmentSample(), alignCSCRings::r, rand(), sharedResources_, and stor::I2OChain::totalDataSize().

Referenced by bindI2OCallbacks().

273 {
274  I2OChain i2oChain(ref);
275 
276  FragmentMonitorCollection& fragMonCollection =
277  sharedResources_->statisticsReporter_->getFragmentMonitorCollection();
278  fragMonCollection.addEventFragmentSample( i2oChain.totalDataSize() );
279 
280  sharedResources_->fragmentQueue_->enqWait(i2oChain);
281 
282 #ifdef STOR_DEBUG_DUPLICATE_MESSAGES
283  double r = rand()/static_cast<double>(RAND_MAX);
284  if (r < 0.001)
285  {
286  LOG4CPLUS_INFO(this->getApplicationLogger(), "Simulating duplicated data message");
287  receiveDataMessage(ref->duplicate());
288  }
289 #endif
290 }
void receiveDataMessage(toolbox::mem::Reference *ref)
SharedResourcesPtr sharedResources_
Signal rand(Signal arg)
Definition: vlib.cc:442
void addEventFragmentSample(const double bytecount)
void StorageManager::receiveDQMMessage ( toolbox::mem::Reference *  ref)
private

Callback for I2O message containing a DQM event (histogramms)

Definition at line 305 of file StorageManager.cc.

References stor::FragmentMonitorCollection::addDQMEventFragmentSample(), sharedResources_, and stor::I2OChain::totalDataSize().

Referenced by bindI2OCallbacks().

306 {
307  I2OChain i2oChain(ref);
308 
309  FragmentMonitorCollection& fragMonCollection =
310  sharedResources_->statisticsReporter_->getFragmentMonitorCollection();
311  fragMonCollection.addDQMEventFragmentSample( i2oChain.totalDataSize() );
312 
313  sharedResources_->fragmentQueue_->enqWait(i2oChain);
314 }
SharedResourcesPtr sharedResources_
void addDQMEventFragmentSample(const double bytecount)
void StorageManager::receiveEndOfLumiSectionMessage ( toolbox::mem::Reference *  ref)
private

Callback for I2O message notifying the end-of-lumi-section

Definition at line 317 of file StorageManager.cc.

References stor::FragmentMonitorCollection::addFragmentSample(), stor::MonitoredQuantity::addSample(), stor::RunMonitorCollection::getEoLSSeenMQ(), stor::I2OChain::lumiSection(), sharedResources_, and stor::I2OChain::totalDataSize().

Referenced by bindI2OCallbacks().

318 {
319  I2OChain i2oChain( ref );
320 
321  FragmentMonitorCollection& fragMonCollection =
322  sharedResources_->statisticsReporter_->getFragmentMonitorCollection();
323  fragMonCollection.addFragmentSample( i2oChain.totalDataSize() );
324 
325  RunMonitorCollection& runMonCollection =
326  sharedResources_->statisticsReporter_->getRunMonitorCollection();
327  runMonCollection.getEoLSSeenMQ().addSample( i2oChain.lumiSection() );
328 
329  sharedResources_->streamQueue_->enqWait( i2oChain );
330 }
void addSample(const double &value=1)
SharedResourcesPtr sharedResources_
const MonitoredQuantity & getEoLSSeenMQ() const
void addFragmentSample(const double bytecount)
void StorageManager::receiveErrorDataMessage ( toolbox::mem::Reference *  ref)
private

Callback for I2O message containing an error event

Definition at line 293 of file StorageManager.cc.

References stor::FragmentMonitorCollection::addEventFragmentSample(), sharedResources_, and stor::I2OChain::totalDataSize().

Referenced by bindI2OCallbacks().

294 {
295  I2OChain i2oChain(ref);
296 
297  FragmentMonitorCollection& fragMonCollection =
298  sharedResources_->statisticsReporter_->getFragmentMonitorCollection();
299  fragMonCollection.addEventFragmentSample( i2oChain.totalDataSize() );
300 
301  sharedResources_->fragmentQueue_->enqWait(i2oChain);
302 }
SharedResourcesPtr sharedResources_
void addEventFragmentSample(const double bytecount)
void StorageManager::receiveRegistryMessage ( toolbox::mem::Reference *  ref)
private

Callback for I2O message containing an init message

Definition at line 253 of file StorageManager.cc.

References stor::MonitoredQuantity::addSample(), stor::FragmentMonitorCollection::getAllFragmentSizeMQ(), stor::ThroughputMonitorCollection::setMemoryPoolPointer(), sharedResources_, and stor::I2OChain::totalDataSize().

Referenced by bindI2OCallbacks().

254 {
255  I2OChain i2oChain(ref);
256 
257  // Set the I2O message pool pointer. Only done for init messages.
258  ThroughputMonitorCollection& throughputMonCollection =
259  sharedResources_->statisticsReporter_->getThroughputMonitorCollection();
260  throughputMonCollection.setMemoryPoolPointer( ref->getBuffer()->getPool() );
261 
262  FragmentMonitorCollection& fragMonCollection =
263  sharedResources_->statisticsReporter_->getFragmentMonitorCollection();
264  fragMonCollection.getAllFragmentSizeMQ().addSample(
265  static_cast<double>( i2oChain.totalDataSize() ) / 0x100000
266  );
267 
268  sharedResources_->fragmentQueue_->enqWait(i2oChain);
269 }
const MonitoredQuantity & getAllFragmentSizeMQ() const
void addSample(const double &value=1)
SharedResourcesPtr sharedResources_
void setMemoryPoolPointer(toolbox::mem::Pool *)
void StorageManager::startWorkerThreads ( )
private

Create and start all worker threads

Definition at line 215 of file StorageManager.cc.

References diskWriter_, dqmEventProcessor_, alignCSCRings::e, cppFunctionSkipper::exception, edm::hlt::Exception, fragmentProcessor_, and sharedResources_.

Referenced by StorageManager().

216 {
217 
218  // Start the workloops
219  try
220  {
222  diskWriter_.reset( new DiskWriter(this, sharedResources_) );
224  sharedResources_->statisticsReporter_->startWorkLoop("theStatisticsReporter");
225  fragmentProcessor_->startWorkLoop("theFragmentProcessor");
226  diskWriter_->startWorkLoop("theDiskWriter");
227  dqmEventProcessor_->startWorkLoop("theDQMEventProcessor");
228  }
229  catch(xcept::Exception &e)
230  {
231  sharedResources_->alarmHandler_->moveToFailedState( e );
232  }
233  catch(std::exception &e)
234  {
235  XCEPT_DECLARE(stor::exception::Exception,
236  sentinelException, e.what());
237  sharedResources_->alarmHandler_->moveToFailedState( sentinelException );
238  }
239  catch(...)
240  {
241  std::string errorMsg = "Unknown exception when starting the workloops";
242  XCEPT_DECLARE(stor::exception::Exception,
243  sentinelException, errorMsg);
244  sharedResources_->alarmHandler_->moveToFailedState( sentinelException );
245  }
246 }
boost::scoped_ptr< DQMEventProcessor > dqmEventProcessor_
boost::scoped_ptr< DiskWriter > diskWriter_
SharedResourcesPtr sharedResources_
boost::scoped_ptr< FragmentProcessor > fragmentProcessor_
void StorageManager::storedDataWebPage ( xgi::Input in,
xgi::Output out 
)
throw (xgi::exception::Exception
)
private

Webinterface callback creating web page showing the stored data information

Definition at line 398 of file StorageManager.cc.

References alignCSCRings::e, cppFunctionSkipper::exception, edm::hlt::Exception, and dbtoconf::out.

Referenced by bindWebInterfaceCallbacks().

400 {
401  std::string errorMsg = "Failed to create the stored data webpage";
402 
403  try
404  {
405  smWebPageHelper_->storedDataWebPage(out);
406  }
407  catch(std::exception &e)
408  {
409  errorMsg += ": ";
410  errorMsg += e.what();
411 
412  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
413  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
414  }
415  catch(...)
416  {
417  errorMsg += ": Unknown exception";
418 
419  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
420  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
421  }
422 }
tuple out
Definition: dbtoconf.py:99
boost::scoped_ptr< SMWebPageHelper > smWebPageHelper_
void StorageManager::throughputWebPage ( xgi::Input in,
xgi::Output out 
)
throw (xgi::exception::Exception
)
private

Webinterface callback creating web page showing statistics about the data throughput in the SM.

Definition at line 574 of file StorageManager.cc.

References alignCSCRings::e, cppFunctionSkipper::exception, edm::hlt::Exception, and dbtoconf::out.

Referenced by bindWebInterfaceCallbacks().

576 {
577  std::string errorMsg = "Failed to create the throughput statistics webpage";
578 
579  try
580  {
581  smWebPageHelper_->throughputWebPage(out);
582  }
583  catch(std::exception &e)
584  {
585  errorMsg += ": ";
586  errorMsg += e.what();
587 
588  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
589  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
590  }
591  catch(...)
592  {
593  errorMsg += ": Unknown exception";
594 
595  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
596  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
597  }
598 
599 }
tuple out
Definition: dbtoconf.py:99
boost::scoped_ptr< SMWebPageHelper > smWebPageHelper_

Member Data Documentation

boost::scoped_ptr<ConsumerUtils_t> stor::StorageManager::consumerUtils_
private

Definition at line 231 of file StorageManager.h.

Referenced by initializeSharedResources().

boost::scoped_ptr<DiskWriter> stor::StorageManager::diskWriter_
private

Definition at line 227 of file StorageManager.h.

Referenced by startWorkerThreads().

boost::scoped_ptr<DQMEventProcessor> stor::StorageManager::dqmEventProcessor_
private

Definition at line 228 of file StorageManager.h.

Referenced by startWorkerThreads().

boost::scoped_ptr<FragmentProcessor> stor::StorageManager::fragmentProcessor_
private

Definition at line 226 of file StorageManager.h.

Referenced by startWorkerThreads().

SharedResourcesPtr stor::StorageManager::sharedResources_
private
boost::scoped_ptr<SMWebPageHelper> stor::StorageManager::smWebPageHelper_
private

Definition at line 232 of file StorageManager.h.

Referenced by initializeSharedResources().