CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
List of all members | Public Member Functions | Private Types | Private Member Functions | Private Attributes
stor::StorageManager Class Reference

#include <StorageManager.h>

Inheritance diagram for stor::StorageManager:

Public Member Functions

 StorageManager (xdaq::ApplicationStub *s)
 

Private Types

typedef ConsumerUtils
< Configuration,
EventQueueCollection
ConsumerUtils_t
 

Private Member Functions

void bindConsumerCallbacks ()
 
void bindI2OCallbacks ()
 
void bindStateMachineCallbacks ()
 
void bindWebInterfaceCallbacks ()
 
void consumerListWebPage (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception)
 
void consumerStatisticsPage (xgi::Input *in, xgi::Output *out) throw ( xgi::exception::Exception )
 
void css (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception)
 
void defaultWebPage (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception)
 
void dqmEventStatisticsWebPage (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception)
 
void fileStatisticsWebPage (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception)
 
xoap::MessageReference handleFSMSoapMessage (xoap::MessageReference) throw ( xoap::exception::Exception )
 
void initializeSharedResources ()
 
void inputWebPage (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception)
 
StorageManageroperator= (StorageManager const &)
 
void processConsumerEventRequest (xgi::Input *in, xgi::Output *out) throw ( xgi::exception::Exception )
 
void processConsumerHeaderRequest (xgi::Input *in, xgi::Output *out) throw ( xgi::exception::Exception )
 
void processConsumerRegistrationRequest (xgi::Input *in, xgi::Output *out) throw ( xgi::exception::Exception )
 
void processDQMConsumerEventRequest (xgi::Input *in, xgi::Output *out) throw ( xgi::exception::Exception )
 
void processDQMConsumerRegistrationRequest (xgi::Input *in, xgi::Output *out) throw ( xgi::exception::Exception )
 
void rbsenderDetailWebPage (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception)
 
void rbsenderWebPage (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception)
 
void receiveDataMessage (toolbox::mem::Reference *ref) throw (i2o::exception::Exception)
 
void receiveDQMMessage (toolbox::mem::Reference *ref) throw (i2o::exception::Exception)
 
void receiveEndOfLumiSectionMessage (toolbox::mem::Reference *ref) throw (i2o::exception::Exception)
 
void receiveErrorDataMessage (toolbox::mem::Reference *ref) throw (i2o::exception::Exception)
 
void receiveRegistryMessage (toolbox::mem::Reference *ref) throw (i2o::exception::Exception)
 
void startWorkerThreads ()
 
 StorageManager (StorageManager const &)
 
void storedDataWebPage (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception)
 
void throughputWebPage (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception)
 

Private Attributes

boost::scoped_ptr
< ConsumerUtils_t
consumerUtils_
 
boost::scoped_ptr< DiskWriterdiskWriter_
 
boost::scoped_ptr
< DQMEventProcessor
dqmEventProcessor_
 
boost::scoped_ptr
< FragmentProcessor
fragmentProcessor_
 
SharedResourcesPtr sharedResources_
 
boost::scoped_ptr
< SMWebPageHelper
smWebPageHelper_
 

Detailed Description

Main class of the StorageManager XDAQ application

Author:
eulisse
Revision:
1.62
Date:
2013/01/07 11:30:00

Definition at line 47 of file StorageManager.h.

Member Typedef Documentation

Definition at line 231 of file StorageManager.h.

Constructor & Destructor Documentation

StorageManager::StorageManager ( xdaq::ApplicationStub *  s)

Definition at line 38 of file StorageManager.cc.

References bindConsumerCallbacks(), bindI2OCallbacks(), bindStateMachineCallbacks(), bindWebInterfaceCallbacks(), alignCSCRings::e, cppFunctionSkipper::exception, edm::hlt::Exception, initializeSharedResources(), startWorkerThreads(), and AlCaHLTBitMon_QueryRunRegistry::string.

38  :
39  xdaq::Application(s)
40 {
41  LOG4CPLUS_INFO(this->getApplicationLogger(),"Making StorageManager");
42 
43  // bind all callback functions
48 
49  std::string errorMsg = "Exception in StorageManager constructor: ";
50  try
51  {
53  }
54  catch(std::exception &e)
55  {
56  errorMsg += e.what();
57  LOG4CPLUS_FATAL( getApplicationLogger(), e.what() );
58  XCEPT_RAISE( stor::exception::Exception, e.what() );
59  }
60  catch(...)
61  {
62  errorMsg += "unknown exception";
63  LOG4CPLUS_FATAL( getApplicationLogger(), errorMsg );
64  XCEPT_RAISE( stor::exception::Exception, errorMsg );
65  }
66 
68 }
stor::StorageManager::StorageManager ( StorageManager const &  )
private

Member Function Documentation

void StorageManager::bindConsumerCallbacks ( )
private

Bind callbacks for consumers

Definition at line 137 of file StorageManager.cc.

References processConsumerEventRequest(), processConsumerHeaderRequest(), processConsumerRegistrationRequest(), processDQMConsumerEventRequest(), and processDQMConsumerRegistrationRequest().

Referenced by StorageManager().

138 {
139  // event consumers
140  xgi::bind( this, &StorageManager::processConsumerRegistrationRequest, "registerConsumer" );
141  xgi::bind( this, &StorageManager::processConsumerHeaderRequest, "getregdata" );
142  xgi::bind( this, &StorageManager::processConsumerEventRequest, "geteventdata" );
143 
144  // dqm event consumers
145  xgi::bind(this,&StorageManager::processDQMConsumerRegistrationRequest, "registerDQMConsumer");
146  xgi::bind(this,&StorageManager::processDQMConsumerEventRequest, "getDQMeventdata");
147 }
void processConsumerRegistrationRequest(xgi::Input *in, xgi::Output *out)
void processConsumerHeaderRequest(xgi::Input *in, xgi::Output *out)
void processDQMConsumerEventRequest(xgi::Input *in, xgi::Output *out)
void processDQMConsumerRegistrationRequest(xgi::Input *in, xgi::Output *out)
void processConsumerEventRequest(xgi::Input *in, xgi::Output *out)
void StorageManager::bindI2OCallbacks ( )
private

Bind callbacks for I2O message

Definition at line 71 of file StorageManager.cc.

References I2O_SM_DATA, I2O_SM_DQM, I2O_SM_ERROR, I2O_SM_PREAMBLE, receiveDataMessage(), receiveDQMMessage(), receiveEndOfLumiSectionMessage(), receiveErrorDataMessage(), and receiveRegistryMessage().

Referenced by StorageManager().

72 {
73  i2o::bind(this,
76  XDAQ_ORGANIZATION_ID);
77  i2o::bind(this,
80  XDAQ_ORGANIZATION_ID);
81  i2o::bind(this,
84  XDAQ_ORGANIZATION_ID);
85  i2o::bind(this,
87  I2O_SM_DQM,
88  XDAQ_ORGANIZATION_ID);
89  i2o::bind(this,
91  I2O_EVM_LUMISECTION,
92  XDAQ_ORGANIZATION_ID);
93 }
#define I2O_SM_ERROR
Definition: i2oEvfMsgs.h:23
#define I2O_SM_DQM
Definition: i2oEvfMsgs.h:25
void receiveEndOfLumiSectionMessage(toolbox::mem::Reference *ref)
#define I2O_SM_PREAMBLE
Definition: i2oEvfMsgs.h:21
void receiveDataMessage(toolbox::mem::Reference *ref)
void receiveErrorDataMessage(toolbox::mem::Reference *ref)
void receiveRegistryMessage(toolbox::mem::Reference *ref)
#define I2O_SM_DATA
Definition: i2oEvfMsgs.h:22
void receiveDQMMessage(toolbox::mem::Reference *ref)
void StorageManager::bindStateMachineCallbacks ( )
private

Bind callbacks for state machine SOAP messages

Definition at line 96 of file StorageManager.cc.

References handleFSMSoapMessage().

Referenced by StorageManager().

97 {
98  xoap::bind( this,
100  "Configure",
101  XDAQ_NS_URI );
102  xoap::bind( this,
104  "Enable",
105  XDAQ_NS_URI );
106  xoap::bind( this,
108  "Stop",
109  XDAQ_NS_URI );
110  xoap::bind( this,
112  "Halt",
113  XDAQ_NS_URI );
114  xoap::bind( this,
116  "EmergencyStop",
117  XDAQ_NS_URI );
118 }
xoap::MessageReference handleFSMSoapMessage(xoap::MessageReference)
void StorageManager::bindWebInterfaceCallbacks ( )
private

Bind callbacks for web interface

Definition at line 121 of file StorageManager.cc.

References consumerListWebPage(), consumerStatisticsPage(), css(), defaultWebPage(), dqmEventStatisticsWebPage(), fileStatisticsWebPage(), inputWebPage(), rbsenderDetailWebPage(), rbsenderWebPage(), storedDataWebPage(), and throughputWebPage().

Referenced by StorageManager().

122 {
123  xgi::bind(this,&StorageManager::css, "styles.css");
124  xgi::bind(this,&StorageManager::defaultWebPage, "Default");
125  xgi::bind(this,&StorageManager::inputWebPage, "input");
126  xgi::bind(this,&StorageManager::storedDataWebPage, "storedData");
127  xgi::bind(this,&StorageManager::rbsenderWebPage, "rbsenderlist");
128  xgi::bind(this,&StorageManager::rbsenderDetailWebPage, "rbsenderdetail");
129  xgi::bind(this,&StorageManager::fileStatisticsWebPage, "fileStatistics");
130  xgi::bind(this,&StorageManager::dqmEventStatisticsWebPage,"dqmEventStatistics");
131  xgi::bind(this,&StorageManager::consumerStatisticsPage, "consumerStatistics" );
132  xgi::bind(this,&StorageManager::consumerListWebPage, "consumerList");
133  xgi::bind(this,&StorageManager::throughputWebPage, "throughputStatistics");
134 }
void inputWebPage(xgi::Input *in, xgi::Output *out)
void consumerStatisticsPage(xgi::Input *in, xgi::Output *out)
void consumerListWebPage(xgi::Input *in, xgi::Output *out)
void fileStatisticsWebPage(xgi::Input *in, xgi::Output *out)
void rbsenderDetailWebPage(xgi::Input *in, xgi::Output *out)
void storedDataWebPage(xgi::Input *in, xgi::Output *out)
void css(xgi::Input *in, xgi::Output *out)
void dqmEventStatisticsWebPage(xgi::Input *in, xgi::Output *out)
void rbsenderWebPage(xgi::Input *in, xgi::Output *out)
void defaultWebPage(xgi::Input *in, xgi::Output *out)
void throughputWebPage(xgi::Input *in, xgi::Output *out)
void StorageManager::consumerListWebPage ( xgi::Input in,
xgi::Output out 
)
throw (xgi::exception::Exception
)
private

Callback returning a XML list of consumer information. The current implementation just returns an empty document.

Definition at line 602 of file StorageManager.cc.

References dbtoconf::out.

Referenced by bindWebInterfaceCallbacks().

604 {
605  out->getHTTPResponseHeader().addHeader( "Content-Type",
606  "application/octet-stream" );
607  out->getHTTPResponseHeader().addHeader( "Content-Transfer-Encoding",
608  "binary" );
609  char buff;
610  out->write( &buff, 0 );
611 }
tuple out
Definition: dbtoconf.py:99
void StorageManager::consumerStatisticsPage ( xgi::Input in,
xgi::Output out 
)
throw (xgi::exception::Exception
)
private

Webinterface callback creating web page showing the connected consumers

Definition at line 424 of file StorageManager.cc.

References alignCSCRings::e, cppFunctionSkipper::exception, edm::hlt::Exception, dbtoconf::out, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by bindWebInterfaceCallbacks().

427 {
428 
429  std::string err_msg =
430  "Failed to create consumer statistics page";
431 
432  try
433  {
434  smWebPageHelper_->consumerStatistics(out);
435  }
436  catch( std::exception &e )
437  {
438  err_msg += ": ";
439  err_msg += e.what();
440  LOG4CPLUS_ERROR( getApplicationLogger(), err_msg );
441  XCEPT_RAISE( xgi::exception::Exception, err_msg );
442  }
443  catch(...)
444  {
445  err_msg += ": Unknown exception";
446  LOG4CPLUS_ERROR( getApplicationLogger(), err_msg );
447  XCEPT_RAISE( xgi::exception::Exception, err_msg );
448  }
449 
450 }
tuple out
Definition: dbtoconf.py:99
boost::scoped_ptr< SMWebPageHelper > smWebPageHelper_
void StorageManager::css ( xgi::Input in,
xgi::Output out 
)
throw (xgi::exception::Exception
)
private

Webinterface callback for style sheet

Definition at line 336 of file StorageManager.cc.

References recoMuon::in, and dbtoconf::out.

Referenced by bindWebInterfaceCallbacks().

338 {
339  smWebPageHelper_->css(in,out);
340 }
tuple out
Definition: dbtoconf.py:99
boost::scoped_ptr< SMWebPageHelper > smWebPageHelper_
void StorageManager::defaultWebPage ( xgi::Input in,
xgi::Output out 
)
throw (xgi::exception::Exception
)
private

Webinterface callback creating default web page

Definition at line 343 of file StorageManager.cc.

References alignCSCRings::e, cppFunctionSkipper::exception, edm::hlt::Exception, dbtoconf::out, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by bindWebInterfaceCallbacks().

345 {
346  std::string errorMsg = "Failed to create the default webpage";
347 
348  try
349  {
350  smWebPageHelper_->defaultWebPage(out);
351  }
352  catch(std::exception &e)
353  {
354  errorMsg += ": ";
355  errorMsg += e.what();
356 
357  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
358  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
359  }
360  catch(...)
361  {
362  errorMsg += ": Unknown exception";
363 
364  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
365  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
366  }
367 }
tuple out
Definition: dbtoconf.py:99
boost::scoped_ptr< SMWebPageHelper > smWebPageHelper_
void StorageManager::dqmEventStatisticsWebPage ( xgi::Input in,
xgi::Output out 
)
throw (xgi::exception::Exception
)
private

Webinterface callback creating web page showing statistics about the processed DQM events.

Definition at line 546 of file StorageManager.cc.

References alignCSCRings::e, cppFunctionSkipper::exception, edm::hlt::Exception, dbtoconf::out, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by bindWebInterfaceCallbacks().

548 {
549  std::string errorMsg = "Failed to create the DQM event statistics webpage";
550 
551  try
552  {
553  smWebPageHelper_->dqmEventWebPage(out);
554  }
555  catch(std::exception &e)
556  {
557  errorMsg += ": ";
558  errorMsg += e.what();
559 
560  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
561  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
562  }
563  catch(...)
564  {
565  errorMsg += ": Unknown exception";
566 
567  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
568  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
569  }
570 }
tuple out
Definition: dbtoconf.py:99
boost::scoped_ptr< SMWebPageHelper > smWebPageHelper_
void StorageManager::fileStatisticsWebPage ( xgi::Input in,
xgi::Output out 
)
throw (xgi::exception::Exception
)
private

Webinterface callback creating web page showing information about recently written files

Definition at line 518 of file StorageManager.cc.

References alignCSCRings::e, cppFunctionSkipper::exception, edm::hlt::Exception, dbtoconf::out, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by bindWebInterfaceCallbacks().

520 {
521  std::string errorMsg = "Failed to create the file statistics webpage";
522 
523  try
524  {
525  smWebPageHelper_->filesWebPage(out);
526  }
527  catch(std::exception &e)
528  {
529  errorMsg += ": ";
530  errorMsg += e.what();
531 
532  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
533  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
534  }
535  catch(...)
536  {
537  errorMsg += ": Unknown exception";
538 
539  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
540  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
541  }
542 
543 }
tuple out
Definition: dbtoconf.py:99
boost::scoped_ptr< SMWebPageHelper > smWebPageHelper_
xoap::MessageReference StorageManager::handleFSMSoapMessage ( xoap::MessageReference  msg)
throw (xoap::exception::Exception
)
private

Callback for SOAP message containint a state machine event, possibly including new configuration values

Definition at line 618 of file StorageManager.cc.

References edmPickEvents::command, stor::soaputils::createFsmSoapResponseMsg(), alignCSCRings::e, cppFunctionSkipper::exception, edm::hlt::Exception, cms::Exception::explainSelf(), stor::soaputils::extractParameters(), lumiQueryAPI::msg, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by bindStateMachineCallbacks().

620 {
621  std::string errorMsg;
622  xoap::MessageReference returnMsg;
623 
624  try {
625  errorMsg = "Failed to extract FSM event and parameters from SOAP message: ";
627 
628  errorMsg = "Failed to put a '" + command + "' state machine event into command queue: ";
629  if (command == "Configure")
630  {
631  sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::Configure() ) );
632  }
633  else if (command == "Enable")
634  {
635  if (sharedResources_->configuration_->streamConfigurationHasChanged())
636  {
637  sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::Reconfigure() ) );
638  }
639  sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::Enable() ) );
640  }
641  else if (command == "Stop")
642  {
643  sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::Stop() ) );
644  }
645  else if (command == "Halt")
646  {
647  sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::Halt() ) );
648  }
649  else if (command == "EmergencyStop")
650  {
651  sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::EmergencyStop() ) );
652  }
653  else
654  {
655  XCEPT_RAISE(stor::exception::StateMachine,
656  "Received an unknown state machine event '" + command + "'.");
657  }
658 
659  errorMsg = "Failed to create FSM SOAP reply message: ";
660  returnMsg = soaputils::createFsmSoapResponseMsg(command,
661  sharedResources_->statisticsReporter_->
662  getStateMachineMonitorCollection().externallyVisibleState());
663  }
664  catch (cms::Exception& e) {
665  errorMsg += e.explainSelf();
666  XCEPT_DECLARE(xoap::exception::Exception,
667  sentinelException, errorMsg);
668  sharedResources_->alarmHandler_->moveToFailedState( sentinelException );
669  throw sentinelException;
670  }
671  catch (xcept::Exception &e) {
672  XCEPT_DECLARE_NESTED(xoap::exception::Exception,
673  sentinelException, errorMsg, e);
674  sharedResources_->alarmHandler_->moveToFailedState( sentinelException );
675  throw sentinelException;
676  }
677  catch (std::exception& e) {
678  errorMsg += e.what();
679  XCEPT_DECLARE(xoap::exception::Exception,
680  sentinelException, errorMsg);
681  sharedResources_->alarmHandler_->moveToFailedState( sentinelException );
682  throw sentinelException;
683  }
684  catch (...) {
685  errorMsg += "Unknown exception";
686  XCEPT_DECLARE(xoap::exception::Exception,
687  sentinelException, errorMsg);
688  sharedResources_->alarmHandler_->moveToFailedState( sentinelException );
689  throw sentinelException;
690  }
691 
692  return returnMsg;
693 }
virtual std::string explainSelf() const
Definition: Exception.cc:146
SharedResourcesPtr sharedResources_
boost::shared_ptr< boost::statechart::event_base > EventPtr_t
Definition: CommandQueue.h:21
xoap::MessageReference createFsmSoapResponseMsg(const std::string commandName, const std::string currentState)
Definition: SoapUtils.cc:38
std::string extractParameters(xoap::MessageReference, xdaq::Application *)
Definition: SoapUtils.cc:25
void StorageManager::initializeSharedResources ( )
private

Initialize the shared resources

Definition at line 150 of file StorageManager.cc.

References stor::QueueConfigurationParams::commandQueueSize_, edm::errors::Configuration, consumerUtils_, stor::QueueConfigurationParams::dqmEventQueueMemoryLimitMB_, stor::QueueConfigurationParams::dqmEventQueueSize_, stor::QueueConfigurationParams::fragmentQueueMemoryLimitMB_, stor::QueueConfigurationParams::fragmentQueueSize_, instance, stor::QueueConfigurationParams::registrationQueueSize_, reset(), sharedResources_, smWebPageHelper_, stor::QueueConfigurationParams::streamQueueMemoryLimitMB_, and stor::QueueConfigurationParams::streamQueueSize_.

Referenced by StorageManager().

151 {
152  sharedResources_.reset(new SharedResources());
153 
154  xdata::InfoSpace *ispace = getApplicationInfoSpace();
155  unsigned long instance = getApplicationDescriptor()->getInstance();
156  sharedResources_->configuration_.reset(new Configuration(ispace, instance));
157 
158  QueueConfigurationParams queueParams =
159  sharedResources_->configuration_->getQueueConfigurationParams();
160  sharedResources_->commandQueue_.
161  reset(new CommandQueue(queueParams.commandQueueSize_));
162  sharedResources_->fragmentQueue_.
163  reset(new FragmentQueue(queueParams.fragmentQueueSize_, queueParams.fragmentQueueMemoryLimitMB_ * 1024*1024));
164  sharedResources_->registrationQueue_.
166  sharedResources_->streamQueue_.
167  reset(new StreamQueue(queueParams.streamQueueSize_, queueParams.streamQueueMemoryLimitMB_ * 1024*1024));
168  sharedResources_->dqmEventQueue_.
169  reset(new DQMEventQueue(queueParams.dqmEventQueueSize_, queueParams.dqmEventQueueMemoryLimitMB_ * 1024*1024));
170 
171  sharedResources_->alarmHandler_.reset( new AlarmHandler(this, sharedResources_) );
172  sharedResources_->statisticsReporter_.reset(
174  );
175  sharedResources_->initMsgCollection_.reset(new InitMsgCollection());
176  sharedResources_->diskWriterResources_.reset(new DiskWriterResources());
177  sharedResources_->dqmEventProcessorResources_.reset(new DQMEventProcessorResources());
178 
179  sharedResources_->statisticsReporter_->getThroughputMonitorCollection().setFragmentQueue(sharedResources_->fragmentQueue_);
180  sharedResources_->statisticsReporter_->getThroughputMonitorCollection().setStreamQueue(sharedResources_->streamQueue_);
181  sharedResources_->statisticsReporter_->getThroughputMonitorCollection().setDQMEventQueue(sharedResources_->dqmEventQueue_);
182 
184  discardManager_.reset(new DiscardManager(getApplicationContext(),
185  getApplicationDescriptor(),
186  sharedResources_->statisticsReporter_->
187  getDataSenderMonitorCollection()));
188 
189  sharedResources_->registrationCollection_.reset( new RegistrationCollection() );
191  sharedResources_->statisticsReporter_->getEventConsumerMonitorCollection();
192  sharedResources_->eventQueueCollection_.reset( new EventQueueCollection( ecmc ) );
193 
195  sharedResources_->statisticsReporter_->getDQMConsumerMonitorCollection();
196  sharedResources_->dqmEventQueueCollection_.reset( new DQMEventQueueCollection( dcmc ) );
197 
198  consumerUtils_.reset( new ConsumerUtils_t(
199  sharedResources_->configuration_,
200  sharedResources_->registrationCollection_,
201  sharedResources_->registrationQueue_,
202  sharedResources_->initMsgCollection_,
203  sharedResources_->eventQueueCollection_,
204  sharedResources_->dqmEventQueueCollection_,
205  sharedResources_->alarmHandler_
206  ) );
207 
209  getApplicationDescriptor(), sharedResources_));
210 
211 }
unsigned int dqmEventQueueMemoryLimitMB_
Definition: Configuration.h:91
static PFTauRenderPlugin instance
ConsumerUtils< Configuration, EventQueueCollection > ConsumerUtils_t
unsigned int fragmentQueueMemoryLimitMB_
Definition: Configuration.h:93
ConcurrentQueue< RegPtr > RegistrationQueue
unsigned int streamQueueMemoryLimitMB_
Definition: Configuration.h:96
SharedResourcesPtr sharedResources_
ConcurrentQueue< I2OChain, KeepNewest< I2OChain > > DQMEventQueue
Definition: DQMEventQueue.h:22
ConcurrentQueue< I2OChain > StreamQueue
Definition: StreamQueue.h:21
ConcurrentQueue< EventPtr_t > CommandQueue
Definition: CommandQueue.h:22
QueueCollection< I2OChain > EventQueueCollection
QueueCollection< DQMTopLevelFolder::Record > DQMEventQueueCollection
void reset(double vett[256])
Definition: TPedValues.cc:11
ConcurrentQueue< I2OChain > FragmentQueue
Definition: FragmentQueue.h:21
boost::scoped_ptr< ConsumerUtils_t > consumerUtils_
boost::scoped_ptr< SMWebPageHelper > smWebPageHelper_
void StorageManager::inputWebPage ( xgi::Input in,
xgi::Output out 
)
throw (xgi::exception::Exception
)
private

Webinterface callback creating web page showing the I2O input information

Definition at line 370 of file StorageManager.cc.

References alignCSCRings::e, cppFunctionSkipper::exception, edm::hlt::Exception, dbtoconf::out, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by bindWebInterfaceCallbacks().

372 {
373  std::string errorMsg = "Failed to create the I2O input webpage";
374 
375  try
376  {
377  smWebPageHelper_->inputWebPage(out);
378  }
379  catch(std::exception &e)
380  {
381  errorMsg += ": ";
382  errorMsg += e.what();
383 
384  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
385  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
386  }
387  catch(...)
388  {
389  errorMsg += ": Unknown exception";
390 
391  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
392  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
393  }
394 }
tuple out
Definition: dbtoconf.py:99
boost::scoped_ptr< SMWebPageHelper > smWebPageHelper_
StorageManager& stor::StorageManager::operator= ( StorageManager const &  )
private
void StorageManager::processConsumerEventRequest ( xgi::Input in,
xgi::Output out 
)
throw (xgi::exception::Exception
)
private

Callback handling event consumer event request

Definition at line 717 of file StorageManager.cc.

References recoMuon::in, and dbtoconf::out.

Referenced by bindConsumerCallbacks().

719 {
720  consumerUtils_->processConsumerEventRequest(in,out);
721 }
tuple out
Definition: dbtoconf.py:99
boost::scoped_ptr< ConsumerUtils_t > consumerUtils_
void StorageManager::processConsumerHeaderRequest ( xgi::Input in,
xgi::Output out 
)
throw (xgi::exception::Exception
)
private

Callback handling event consumer init message request

Definition at line 709 of file StorageManager.cc.

References recoMuon::in, and dbtoconf::out.

Referenced by bindConsumerCallbacks().

711 {
712  consumerUtils_->processConsumerHeaderRequest(in,out);
713 }
tuple out
Definition: dbtoconf.py:99
boost::scoped_ptr< ConsumerUtils_t > consumerUtils_
void StorageManager::processConsumerRegistrationRequest ( xgi::Input in,
xgi::Output out 
)
throw (xgi::exception::Exception
)
private

Callback handling event consumer registration request

Definition at line 701 of file StorageManager.cc.

References recoMuon::in, and dbtoconf::out.

Referenced by bindConsumerCallbacks().

703 {
704  consumerUtils_->processConsumerRegistrationRequest(in,out);
705 }
tuple out
Definition: dbtoconf.py:99
boost::scoped_ptr< ConsumerUtils_t > consumerUtils_
void StorageManager::processDQMConsumerEventRequest ( xgi::Input in,
xgi::Output out 
)
throw (xgi::exception::Exception
)
private

Callback handling DQM event consumer DQM event request

Definition at line 733 of file StorageManager.cc.

References recoMuon::in, and dbtoconf::out.

Referenced by bindConsumerCallbacks().

735 {
736  consumerUtils_->processDQMConsumerEventRequest(in,out);
737 }
tuple out
Definition: dbtoconf.py:99
boost::scoped_ptr< ConsumerUtils_t > consumerUtils_
void StorageManager::processDQMConsumerRegistrationRequest ( xgi::Input in,
xgi::Output out 
)
throw (xgi::exception::Exception
)
private

Callback handling DQM event consumer registration request

Definition at line 725 of file StorageManager.cc.

References recoMuon::in, and dbtoconf::out.

Referenced by bindConsumerCallbacks().

727 {
728  consumerUtils_->processDQMConsumerRegistrationRequest(in,out);
729 }
tuple out
Definition: dbtoconf.py:99
boost::scoped_ptr< ConsumerUtils_t > consumerUtils_
void StorageManager::rbsenderDetailWebPage ( xgi::Input in,
xgi::Output out 
)
throw (xgi::exception::Exception
)
private

Webinterface callback creating web page showing detailed information about the resource broker sending data.

Definition at line 481 of file StorageManager.cc.

References alignCSCRings::e, cppFunctionSkipper::exception, edm::hlt::Exception, recoMuon::in, dbtoconf::out, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by bindWebInterfaceCallbacks().

483 {
484  std::string errorMsg = "Failed to create the data sender webpage";
485 
486  try
487  {
488  long long localRBID = 0;
489  cgicc::Cgicc cgiWrapper(in);
490  cgicc::const_form_iterator updateRef = cgiWrapper.getElement("id");
491  if (updateRef != cgiWrapper.getElements().end())
492  {
493  std::string idString = updateRef->getValue();
494  localRBID = boost::lexical_cast<long long>(idString);
495  }
496 
497  smWebPageHelper_->resourceBrokerDetail(out, localRBID);
498  }
499  catch(std::exception &e)
500  {
501  errorMsg += ": ";
502  errorMsg += e.what();
503 
504  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
505  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
506  }
507  catch(...)
508  {
509  errorMsg += ": Unknown exception";
510 
511  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
512  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
513  }
514 
515 }
tuple out
Definition: dbtoconf.py:99
boost::scoped_ptr< SMWebPageHelper > smWebPageHelper_
void StorageManager::rbsenderWebPage ( xgi::Input in,
xgi::Output out 
)
throw (xgi::exception::Exception
)
private

Webinterface callback creating web page showing summary information about the resource broker sending data.

Definition at line 453 of file StorageManager.cc.

References alignCSCRings::e, cppFunctionSkipper::exception, edm::hlt::Exception, dbtoconf::out, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by bindWebInterfaceCallbacks().

455 {
456  std::string errorMsg = "Failed to create the data sender webpage";
457 
458  try
459  {
460  smWebPageHelper_->resourceBrokerOverview(out);
461  }
462  catch(std::exception &e)
463  {
464  errorMsg += ": ";
465  errorMsg += e.what();
466 
467  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
468  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
469  }
470  catch(...)
471  {
472  errorMsg += ": Unknown exception";
473 
474  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
475  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
476  }
477 
478 }
tuple out
Definition: dbtoconf.py:99
boost::scoped_ptr< SMWebPageHelper > smWebPageHelper_
void StorageManager::receiveDataMessage ( toolbox::mem::Reference *  ref)
throw (i2o::exception::Exception
)
private

Callback for I2O message containing an event

Definition at line 271 of file StorageManager.cc.

References stor::FragmentMonitorCollection::addEventFragmentSample(), alignCSCRings::r, rand(), and stor::I2OChain::totalDataSize().

Referenced by bindI2OCallbacks().

272 {
273  I2OChain i2oChain(ref);
274 
275  FragmentMonitorCollection& fragMonCollection =
276  sharedResources_->statisticsReporter_->getFragmentMonitorCollection();
277  fragMonCollection.addEventFragmentSample( i2oChain.totalDataSize() );
278 
279  sharedResources_->fragmentQueue_->enqWait(i2oChain);
280 
281 #ifdef STOR_DEBUG_DUPLICATE_MESSAGES
282  double r = rand()/static_cast<double>(RAND_MAX);
283  if (r < 0.001)
284  {
285  LOG4CPLUS_INFO(this->getApplicationLogger(), "Simulating duplicated data message");
286  receiveDataMessage(ref->duplicate());
287  }
288 #endif
289 }
void receiveDataMessage(toolbox::mem::Reference *ref)
SharedResourcesPtr sharedResources_
Signal rand(Signal arg)
Definition: vlib.cc:442
void addEventFragmentSample(const double bytecount)
void StorageManager::receiveDQMMessage ( toolbox::mem::Reference *  ref)
throw (i2o::exception::Exception
)
private

Callback for I2O message containing a DQM event (histogramms)

Definition at line 304 of file StorageManager.cc.

References stor::FragmentMonitorCollection::addDQMEventFragmentSample(), and stor::I2OChain::totalDataSize().

Referenced by bindI2OCallbacks().

305 {
306  I2OChain i2oChain(ref);
307 
308  FragmentMonitorCollection& fragMonCollection =
309  sharedResources_->statisticsReporter_->getFragmentMonitorCollection();
310  fragMonCollection.addDQMEventFragmentSample( i2oChain.totalDataSize() );
311 
312  sharedResources_->fragmentQueue_->enqWait(i2oChain);
313 }
SharedResourcesPtr sharedResources_
void addDQMEventFragmentSample(const double bytecount)
void StorageManager::receiveEndOfLumiSectionMessage ( toolbox::mem::Reference *  ref)
throw (i2o::exception::Exception
)
private

Callback for I2O message notifying the end-of-lumi-section

Definition at line 316 of file StorageManager.cc.

References stor::FragmentMonitorCollection::addFragmentSample(), stor::MonitoredQuantity::addSample(), stor::RunMonitorCollection::getEoLSSeenMQ(), stor::I2OChain::lumiSection(), and stor::I2OChain::totalDataSize().

Referenced by bindI2OCallbacks().

317 {
318  I2OChain i2oChain( ref );
319 
320  FragmentMonitorCollection& fragMonCollection =
321  sharedResources_->statisticsReporter_->getFragmentMonitorCollection();
322  fragMonCollection.addFragmentSample( i2oChain.totalDataSize() );
323 
324  RunMonitorCollection& runMonCollection =
325  sharedResources_->statisticsReporter_->getRunMonitorCollection();
326  runMonCollection.getEoLSSeenMQ().addSample( i2oChain.lumiSection() );
327 
328  sharedResources_->streamQueue_->enqWait( i2oChain );
329 }
void addSample(const double &value=1)
SharedResourcesPtr sharedResources_
const MonitoredQuantity & getEoLSSeenMQ() const
void addFragmentSample(const double bytecount)
void StorageManager::receiveErrorDataMessage ( toolbox::mem::Reference *  ref)
throw (i2o::exception::Exception
)
private

Callback for I2O message containing an error event

Definition at line 292 of file StorageManager.cc.

References stor::FragmentMonitorCollection::addEventFragmentSample(), and stor::I2OChain::totalDataSize().

Referenced by bindI2OCallbacks().

293 {
294  I2OChain i2oChain(ref);
295 
296  FragmentMonitorCollection& fragMonCollection =
297  sharedResources_->statisticsReporter_->getFragmentMonitorCollection();
298  fragMonCollection.addEventFragmentSample( i2oChain.totalDataSize() );
299 
300  sharedResources_->fragmentQueue_->enqWait(i2oChain);
301 }
SharedResourcesPtr sharedResources_
void addEventFragmentSample(const double bytecount)
void StorageManager::receiveRegistryMessage ( toolbox::mem::Reference *  ref)
throw (i2o::exception::Exception
)
private

Callback for I2O message containing an init message

Definition at line 252 of file StorageManager.cc.

References stor::MonitoredQuantity::addSample(), stor::FragmentMonitorCollection::getAllFragmentSizeMQ(), stor::ThroughputMonitorCollection::setMemoryPoolPointer(), and stor::I2OChain::totalDataSize().

Referenced by bindI2OCallbacks().

253 {
254  I2OChain i2oChain(ref);
255 
256  // Set the I2O message pool pointer. Only done for init messages.
257  ThroughputMonitorCollection& throughputMonCollection =
258  sharedResources_->statisticsReporter_->getThroughputMonitorCollection();
259  throughputMonCollection.setMemoryPoolPointer( ref->getBuffer()->getPool() );
260 
261  FragmentMonitorCollection& fragMonCollection =
262  sharedResources_->statisticsReporter_->getFragmentMonitorCollection();
263  fragMonCollection.getAllFragmentSizeMQ().addSample(
264  static_cast<double>( i2oChain.totalDataSize() ) / 0x100000
265  );
266 
267  sharedResources_->fragmentQueue_->enqWait(i2oChain);
268 }
const MonitoredQuantity & getAllFragmentSizeMQ() const
void addSample(const double &value=1)
SharedResourcesPtr sharedResources_
void setMemoryPoolPointer(toolbox::mem::Pool *)
void StorageManager::startWorkerThreads ( )
private

Create and start all worker threads

Definition at line 214 of file StorageManager.cc.

References diskWriter_, dqmEventProcessor_, alignCSCRings::e, cppFunctionSkipper::exception, edm::hlt::Exception, fragmentProcessor_, sharedResources_, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by StorageManager().

215 {
216 
217  // Start the workloops
218  try
219  {
221  diskWriter_.reset( new DiskWriter(this, sharedResources_) );
223  sharedResources_->statisticsReporter_->startWorkLoop("theStatisticsReporter");
224  fragmentProcessor_->startWorkLoop("theFragmentProcessor");
225  diskWriter_->startWorkLoop("theDiskWriter");
226  dqmEventProcessor_->startWorkLoop("theDQMEventProcessor");
227  }
228  catch(xcept::Exception &e)
229  {
230  sharedResources_->alarmHandler_->moveToFailedState( e );
231  }
232  catch(std::exception &e)
233  {
234  XCEPT_DECLARE(stor::exception::Exception,
235  sentinelException, e.what());
236  sharedResources_->alarmHandler_->moveToFailedState( sentinelException );
237  }
238  catch(...)
239  {
240  std::string errorMsg = "Unknown exception when starting the workloops";
241  XCEPT_DECLARE(stor::exception::Exception,
242  sentinelException, errorMsg);
243  sharedResources_->alarmHandler_->moveToFailedState( sentinelException );
244  }
245 }
boost::scoped_ptr< DQMEventProcessor > dqmEventProcessor_
boost::scoped_ptr< DiskWriter > diskWriter_
SharedResourcesPtr sharedResources_
boost::scoped_ptr< FragmentProcessor > fragmentProcessor_
void StorageManager::storedDataWebPage ( xgi::Input in,
xgi::Output out 
)
throw (xgi::exception::Exception
)
private

Webinterface callback creating web page showing the stored data information

Definition at line 397 of file StorageManager.cc.

References alignCSCRings::e, cppFunctionSkipper::exception, edm::hlt::Exception, dbtoconf::out, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by bindWebInterfaceCallbacks().

399 {
400  std::string errorMsg = "Failed to create the stored data webpage";
401 
402  try
403  {
404  smWebPageHelper_->storedDataWebPage(out);
405  }
406  catch(std::exception &e)
407  {
408  errorMsg += ": ";
409  errorMsg += e.what();
410 
411  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
412  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
413  }
414  catch(...)
415  {
416  errorMsg += ": Unknown exception";
417 
418  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
419  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
420  }
421 }
tuple out
Definition: dbtoconf.py:99
boost::scoped_ptr< SMWebPageHelper > smWebPageHelper_
void StorageManager::throughputWebPage ( xgi::Input in,
xgi::Output out 
)
throw (xgi::exception::Exception
)
private

Webinterface callback creating web page showing statistics about the data throughput in the SM.

Definition at line 573 of file StorageManager.cc.

References alignCSCRings::e, cppFunctionSkipper::exception, edm::hlt::Exception, dbtoconf::out, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by bindWebInterfaceCallbacks().

575 {
576  std::string errorMsg = "Failed to create the throughput statistics webpage";
577 
578  try
579  {
580  smWebPageHelper_->throughputWebPage(out);
581  }
582  catch(std::exception &e)
583  {
584  errorMsg += ": ";
585  errorMsg += e.what();
586 
587  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
588  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
589  }
590  catch(...)
591  {
592  errorMsg += ": Unknown exception";
593 
594  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
595  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
596  }
597 
598 }
tuple out
Definition: dbtoconf.py:99
boost::scoped_ptr< SMWebPageHelper > smWebPageHelper_

Member Data Documentation

boost::scoped_ptr<ConsumerUtils_t> stor::StorageManager::consumerUtils_
private

Definition at line 232 of file StorageManager.h.

Referenced by initializeSharedResources().

boost::scoped_ptr<DiskWriter> stor::StorageManager::diskWriter_
private

Definition at line 228 of file StorageManager.h.

Referenced by startWorkerThreads().

boost::scoped_ptr<DQMEventProcessor> stor::StorageManager::dqmEventProcessor_
private

Definition at line 229 of file StorageManager.h.

Referenced by startWorkerThreads().

boost::scoped_ptr<FragmentProcessor> stor::StorageManager::fragmentProcessor_
private

Definition at line 227 of file StorageManager.h.

Referenced by startWorkerThreads().

SharedResourcesPtr stor::StorageManager::sharedResources_
private

Definition at line 225 of file StorageManager.h.

Referenced by initializeSharedResources(), and startWorkerThreads().

boost::scoped_ptr<SMWebPageHelper> stor::StorageManager::smWebPageHelper_
private

Definition at line 233 of file StorageManager.h.

Referenced by initializeSharedResources().