15 #include "EventFilter/StorageManager/src/ConsumerUtils.icc"
21 #include "i2o/Method.h"
22 #include "interface/shared/version.h"
23 #include "interface/shared/i2oXFunctionCodes.h"
24 #include "xcept/tools.h"
25 #include "xdaq/NamespaceURI.h"
26 #include "xdata/InfoSpaceFactory.h"
27 #include "xgi/Method.h"
28 #include "xoap/Method.h"
30 #include <boost/lexical_cast.hpp>
31 #include <boost/shared_ptr.hpp>
39 StorageManager::StorageManager(xdaq::ApplicationStub *
s) :
42 LOG4CPLUS_INFO(this->getApplicationLogger(),
"Making StorageManager");
50 std::string errorMsg =
"Exception in StorageManager constructor: ";
58 LOG4CPLUS_FATAL( getApplicationLogger(), e.what() );
63 errorMsg +=
"unknown exception";
64 LOG4CPLUS_FATAL( getApplicationLogger(), errorMsg );
77 XDAQ_ORGANIZATION_ID);
81 XDAQ_ORGANIZATION_ID);
85 XDAQ_ORGANIZATION_ID);
89 XDAQ_ORGANIZATION_ID);
93 XDAQ_ORGANIZATION_ID);
154 xdata::InfoSpace *ispace = getApplicationInfoSpace();
155 unsigned long instance = getApplicationDescriptor()->getInstance();
184 getApplicationDescriptor(),
186 getDataSenderMonitorCollection()));
222 sharedResources_->statisticsReporter_->startWorkLoop(
"theStatisticsReporter");
234 sentinelException, e.what());
239 std::string errorMsg =
"Unknown exception when starting the workloops";
241 sentinelException, errorMsg);
280 #ifdef STOR_DEBUG_DUPLICATE_MESSAGES
281 double r =
rand()/
static_cast<double>(RAND_MAX);
284 LOG4CPLUS_INFO(this->getApplicationLogger(),
"Simulating duplicated data message");
338 smWebPageHelper_->css(
in,
out);
345 std::string errorMsg =
"Failed to create the default webpage";
349 smWebPageHelper_->defaultWebPage(
out);
354 errorMsg += e.what();
356 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
361 errorMsg +=
": Unknown exception";
363 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
372 std::string errorMsg =
"Failed to create the stored data webpage";
376 smWebPageHelper_->storedDataWebPage(
out);
381 errorMsg += e.what();
383 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
388 errorMsg +=
": Unknown exception";
390 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
401 std::string err_msg =
402 "Failed to create consumer statistics page";
406 smWebPageHelper_->consumerStatistics(
out);
412 LOG4CPLUS_ERROR( getApplicationLogger(), err_msg );
417 err_msg +=
": Unknown exception";
418 LOG4CPLUS_ERROR( getApplicationLogger(), err_msg );
428 std::string errorMsg =
"Failed to create the data sender webpage";
432 smWebPageHelper_->resourceBrokerOverview(
out);
437 errorMsg += e.what();
439 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
444 errorMsg +=
": Unknown exception";
446 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
456 std::string errorMsg =
"Failed to create the data sender webpage";
460 long long localRBID = 0;
461 cgicc::Cgicc cgiWrapper(
in);
462 cgicc::const_form_iterator updateRef = cgiWrapper.getElement(
"id");
463 if (updateRef != cgiWrapper.getElements().end())
465 std::string idString = updateRef->getValue();
466 localRBID = boost::lexical_cast<
long long>(idString);
469 smWebPageHelper_->resourceBrokerDetail(
out, localRBID);
474 errorMsg += e.what();
476 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
481 errorMsg +=
": Unknown exception";
483 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
493 std::string errorMsg =
"Failed to create the file statistics webpage";
497 smWebPageHelper_->filesWebPage(
out);
502 errorMsg += e.what();
504 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
509 errorMsg +=
": Unknown exception";
511 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
521 std::string errorMsg =
"Failed to create the DQM event statistics webpage";
525 smWebPageHelper_->dqmEventWebPage(
out);
530 errorMsg += e.what();
532 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
537 errorMsg +=
": Unknown exception";
539 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
548 std::string errorMsg =
"Failed to create the throughput statistics webpage";
552 smWebPageHelper_->throughputWebPage(
out);
557 errorMsg += e.what();
559 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
564 errorMsg +=
": Unknown exception";
566 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
577 out->getHTTPResponseHeader().addHeader(
"Content-Type",
578 "application/octet-stream" );
579 out->getHTTPResponseHeader().addHeader(
"Content-Transfer-Encoding",
582 out->write( &buff, 0 );
593 std::string errorMsg;
594 xoap::MessageReference returnMsg;
597 errorMsg =
"Failed to extract FSM event and parameters from SOAP message: ";
600 errorMsg =
"Failed to put a '" + command +
"' state machine event into command queue: ";
601 if (command ==
"Configure")
605 else if (command ==
"Enable")
607 if (sharedResources_->configuration_->streamConfigurationHasChanged())
613 else if (command ==
"Stop")
617 else if (command ==
"Halt")
621 else if (command ==
"EmergencyStop")
627 XCEPT_RAISE(stor::exception::StateMachine,
628 "Received an unknown state machine event '" + command +
"'.");
631 errorMsg =
"Failed to create FSM SOAP reply message: ";
633 sharedResources_->statisticsReporter_->
634 getStateMachineMonitorCollection().externallyVisibleState());
639 sentinelException, errorMsg);
640 sharedResources_->moveToFailedState( sentinelException );
641 throw sentinelException;
645 sentinelException, errorMsg, e);
646 sharedResources_->moveToFailedState( sentinelException );
647 throw sentinelException;
650 errorMsg += e.what();
652 sentinelException, errorMsg);
653 sharedResources_->moveToFailedState( sentinelException );
654 throw sentinelException;
657 errorMsg +=
"Unknown exception";
659 sentinelException, errorMsg);
660 sharedResources_->moveToFailedState( sentinelException );
661 throw sentinelException;
676 consumerUtils_->processConsumerRegistrationRequest(
in,
out);
684 consumerUtils_->processConsumerHeaderRequest(
in,
out);
692 consumerUtils_->processConsumerEventRequest(
in,
out);
700 consumerUtils_->processDQMConsumerRegistrationRequest(
in,
out);
708 consumerUtils_->processDQMConsumerEventRequest(
in,
out);
725
const MonitoredQuantity & getAllFragmentSizeMQ() const
void processConsumerRegistrationRequest(xgi::Input *in, xgi::Output *out)
unsigned int dqmEventQueueMemoryLimitMB_
boost::scoped_ptr< DQMEventProcessor > dqmEventProcessor_
void receiveEndOfLumiSectionMessage(toolbox::mem::Reference *ref)
boost::scoped_ptr< DiskWriter > diskWriter_
virtual std::string explainSelf() const
void receiveDataMessage(toolbox::mem::Reference *ref)
void addSample(const double &value=1)
ConsumerUtils< Configuration, EventQueueCollection > ConsumerUtils_t
unsigned int fragmentQueueMemoryLimitMB_
ConcurrentQueue< RegPtr > RegistrationQueue
void consumerStatisticsPage(xgi::Input *in, xgi::Output *out)
unsigned int streamQueueMemoryLimitMB_
unsigned int fragmentQueueSize_
SharedResourcesPtr sharedResources_
boost::shared_ptr< boost::statechart::event_base > EventPtr_t
boost::scoped_ptr< FragmentProcessor > fragmentProcessor_
void startWorkerThreads()
void consumerListWebPage(xgi::Input *in, xgi::Output *out)
void bindStateMachineCallbacks()
xoap::MessageReference createFsmSoapResponseMsg(const std::string commandName, const std::string currentState)
ConcurrentQueue< I2OChain, KeepNewest< I2OChain > > DQMEventQueue
ConcurrentQueue< I2OChain > StreamQueue
void processConsumerHeaderRequest(xgi::Input *in, xgi::Output *out)
uint32_t lumiSection() const
unsigned int registrationQueueSize_
void fileStatisticsWebPage(xgi::Input *in, xgi::Output *out)
void rbsenderDetailWebPage(xgi::Input *in, xgi::Output *out)
void initializeSharedResources()
void storedDataWebPage(xgi::Input *in, xgi::Output *out)
void setMemoryPoolPointer(toolbox::mem::Pool *)
const MonitoredQuantity & getEoLSSeenMQ() const
unsigned int dqmEventQueueSize_
void css(xgi::Input *in, xgi::Output *out)
void dqmEventStatisticsWebPage(xgi::Input *in, xgi::Output *out)
void processDQMConsumerEventRequest(xgi::Input *in, xgi::Output *out)
void bindConsumerCallbacks()
ConcurrentQueue< EventPtr_t > CommandQueue
QueueCollection< I2OChain > EventQueueCollection
unsigned long totalDataSize() const
QueueCollection< DQMTopLevelFolder::Record > DQMEventQueueCollection
void receiveErrorDataMessage(toolbox::mem::Reference *ref)
void rbsenderWebPage(xgi::Input *in, xgi::Output *out)
void processDQMConsumerRegistrationRequest(xgi::Input *in, xgi::Output *out)
std::string extractParameters(xoap::MessageReference, xdaq::Application *)
void receiveRegistryMessage(toolbox::mem::Reference *ref)
unsigned int commandQueueSize_
void bindWebInterfaceCallbacks()
void defaultWebPage(xgi::Input *in, xgi::Output *out)
void processConsumerEventRequest(xgi::Input *in, xgi::Output *out)
void throughputWebPage(xgi::Input *in, xgi::Output *out)
xoap::MessageReference handleFSMSoapMessage(xoap::MessageReference)
void addDQMEventFragmentSample(const double bytecount)
void reset(double vett[256])
ConcurrentQueue< I2OChain > FragmentQueue
boost::scoped_ptr< ConsumerUtils_t > consumerUtils_
void addEventFragmentSample(const double bytecount)
boost::scoped_ptr< SMWebPageHelper > smWebPageHelper_
void receiveDQMMessage(toolbox::mem::Reference *ref)
void addFragmentSample(const double bytecount)
unsigned int streamQueueSize_