15 #include "EventFilter/StorageManager/src/ConsumerUtils.icc"
21 #include "i2o/Method.h"
22 #include "interface/shared/version.h"
23 #include "interface/shared/i2oXFunctionCodes.h"
24 #include "xcept/tools.h"
25 #include "xdaq/NamespaceURI.h"
26 #include "xdata/InfoSpaceFactory.h"
27 #include "xgi/Method.h"
28 #include "xoap/Method.h"
30 #include <boost/lexical_cast.hpp>
31 #include <boost/shared_ptr.hpp>
39 StorageManager::StorageManager(xdaq::ApplicationStub *
s) :
42 LOG4CPLUS_INFO(this->getApplicationLogger(),
"Making StorageManager");
50 std::string errorMsg =
"Exception in StorageManager constructor: ";
58 LOG4CPLUS_FATAL( getApplicationLogger(), e.what() );
63 errorMsg +=
"unknown exception";
64 LOG4CPLUS_FATAL( getApplicationLogger(), errorMsg );
77 XDAQ_ORGANIZATION_ID);
81 XDAQ_ORGANIZATION_ID);
85 XDAQ_ORGANIZATION_ID);
89 XDAQ_ORGANIZATION_ID);
93 XDAQ_ORGANIZATION_ID);
155 xdata::InfoSpace *ispace = getApplicationInfoSpace();
156 unsigned long instance = getApplicationDescriptor()->getInstance();
186 getApplicationDescriptor(),
188 getDataSenderMonitorCollection()));
224 sharedResources_->statisticsReporter_->startWorkLoop(
"theStatisticsReporter");
236 sentinelException, e.what());
241 std::string errorMsg =
"Unknown exception when starting the workloops";
243 sentinelException, errorMsg);
282 #ifdef STOR_DEBUG_DUPLICATE_MESSAGES
283 double r =
rand()/
static_cast<double>(RAND_MAX);
286 LOG4CPLUS_INFO(this->getApplicationLogger(),
"Simulating duplicated data message");
340 smWebPageHelper_->css(
in,
out);
347 std::string errorMsg =
"Failed to create the default webpage";
351 smWebPageHelper_->defaultWebPage(
out);
356 errorMsg += e.what();
358 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
363 errorMsg +=
": Unknown exception";
365 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
374 std::string errorMsg =
"Failed to create the I2O input webpage";
378 smWebPageHelper_->inputWebPage(
out);
383 errorMsg += e.what();
385 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
390 errorMsg +=
": Unknown exception";
392 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
401 std::string errorMsg =
"Failed to create the stored data webpage";
405 smWebPageHelper_->storedDataWebPage(
out);
410 errorMsg += e.what();
412 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
417 errorMsg +=
": Unknown exception";
419 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
430 std::string err_msg =
431 "Failed to create consumer statistics page";
435 smWebPageHelper_->consumerStatistics(
out);
441 LOG4CPLUS_ERROR( getApplicationLogger(), err_msg );
446 err_msg +=
": Unknown exception";
447 LOG4CPLUS_ERROR( getApplicationLogger(), err_msg );
457 std::string errorMsg =
"Failed to create the data sender webpage";
461 smWebPageHelper_->resourceBrokerOverview(
out);
466 errorMsg += e.what();
468 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
473 errorMsg +=
": Unknown exception";
475 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
485 std::string errorMsg =
"Failed to create the data sender webpage";
489 long long localRBID = 0;
490 cgicc::Cgicc cgiWrapper(
in);
491 cgicc::const_form_iterator updateRef = cgiWrapper.getElement(
"id");
492 if (updateRef != cgiWrapper.getElements().end())
494 std::string idString = updateRef->getValue();
495 localRBID = boost::lexical_cast<
long long>(idString);
498 smWebPageHelper_->resourceBrokerDetail(
out, localRBID);
503 errorMsg += e.what();
505 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
510 errorMsg +=
": Unknown exception";
512 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
522 std::string errorMsg =
"Failed to create the file statistics webpage";
526 smWebPageHelper_->filesWebPage(
out);
531 errorMsg += e.what();
533 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
538 errorMsg +=
": Unknown exception";
540 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
550 std::string errorMsg =
"Failed to create the DQM event statistics webpage";
554 smWebPageHelper_->dqmEventWebPage(
out);
559 errorMsg += e.what();
561 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
566 errorMsg +=
": Unknown exception";
568 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
577 std::string errorMsg =
"Failed to create the throughput statistics webpage";
581 smWebPageHelper_->throughputWebPage(
out);
586 errorMsg += e.what();
588 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
593 errorMsg +=
": Unknown exception";
595 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
606 out->getHTTPResponseHeader().addHeader(
"Content-Type",
607 "application/octet-stream" );
608 out->getHTTPResponseHeader().addHeader(
"Content-Transfer-Encoding",
611 out->write( &buff, 0 );
622 std::string errorMsg;
623 xoap::MessageReference returnMsg;
626 errorMsg =
"Failed to extract FSM event and parameters from SOAP message: ";
629 errorMsg =
"Failed to put a '" + command +
"' state machine event into command queue: ";
630 if (command ==
"Configure")
634 else if (command ==
"Enable")
636 if (sharedResources_->configuration_->streamConfigurationHasChanged())
642 else if (command ==
"Stop")
646 else if (command ==
"Halt")
650 else if (command ==
"EmergencyStop")
656 XCEPT_RAISE(stor::exception::StateMachine,
657 "Received an unknown state machine event '" + command +
"'.");
660 errorMsg =
"Failed to create FSM SOAP reply message: ";
662 sharedResources_->statisticsReporter_->
663 getStateMachineMonitorCollection().externallyVisibleState());
668 sentinelException, errorMsg);
669 sharedResources_->alarmHandler_->moveToFailedState( sentinelException );
670 throw sentinelException;
674 sentinelException, errorMsg, e);
675 sharedResources_->alarmHandler_->moveToFailedState( sentinelException );
676 throw sentinelException;
679 errorMsg += e.what();
681 sentinelException, errorMsg);
682 sharedResources_->alarmHandler_->moveToFailedState( sentinelException );
683 throw sentinelException;
686 errorMsg +=
"Unknown exception";
688 sentinelException, errorMsg);
689 sharedResources_->alarmHandler_->moveToFailedState( sentinelException );
690 throw sentinelException;
705 consumerUtils_->processConsumerRegistrationRequest(
in,
out);
713 consumerUtils_->processConsumerHeaderRequest(
in,
out);
721 consumerUtils_->processConsumerEventRequest(
in,
out);
729 consumerUtils_->processDQMConsumerRegistrationRequest(
in,
out);
737 consumerUtils_->processDQMConsumerEventRequest(
in,
out);
749 writeHTTPHeaders( out );
751 #ifdef STOR_DEBUG_CORRUPTED_EVENT_HEADER
752 double r =
rand()/
static_cast<double>(RAND_MAX);
755 std::cout <<
"Simulating corrupted event header" << std::endl;
759 #endif // STOR_DEBUG_CORRUPTED_EVENT_HEADER
762 for (
unsigned int i = 0;
i < nfrags; ++
i )
764 const unsigned long len = evt.
dataSize(
i );
766 out->write( (
char*)location, len );
785 const MonitoredQuantity & getAllFragmentSizeMQ() const
void processConsumerRegistrationRequest(xgi::Input *in, xgi::Output *out)
unsigned int fragmentCount() const
unsigned int dqmEventQueueMemoryLimitMB_
boost::scoped_ptr< DQMEventProcessor > dqmEventProcessor_
void receiveEndOfLumiSectionMessage(toolbox::mem::Reference *ref)
boost::scoped_ptr< DiskWriter > diskWriter_
static PFTauRenderPlugin instance
void inputWebPage(xgi::Input *in, xgi::Output *out)
virtual std::string explainSelf() const
void receiveDataMessage(toolbox::mem::Reference *ref)
void addSample(const double &value=1)
ConsumerUtils< Configuration, EventQueueCollection > ConsumerUtils_t
unsigned int fragmentQueueMemoryLimitMB_
ConcurrentQueue< RegPtr > RegistrationQueue
void consumerStatisticsPage(xgi::Input *in, xgi::Output *out)
unsigned int streamQueueMemoryLimitMB_
unsigned int fragmentQueueSize_
SharedResourcesPtr sharedResources_
boost::shared_ptr< boost::statechart::event_base > EventPtr_t
boost::scoped_ptr< FragmentProcessor > fragmentProcessor_
void writeConsumerEvent(xgi::Output *, const stor::I2OChain &) const
void startWorkerThreads()
void consumerListWebPage(xgi::Input *in, xgi::Output *out)
void bindStateMachineCallbacks()
xoap::MessageReference createFsmSoapResponseMsg(const std::string commandName, const std::string currentState)
ConcurrentQueue< I2OChain, KeepNewest< I2OChain > > DQMEventQueue
ConcurrentQueue< I2OChain > StreamQueue
void processConsumerHeaderRequest(xgi::Input *in, xgi::Output *out)
uint32_t lumiSection() const
unsigned int registrationQueueSize_
void fileStatisticsWebPage(xgi::Input *in, xgi::Output *out)
void rbsenderDetailWebPage(xgi::Input *in, xgi::Output *out)
void initializeSharedResources()
void storedDataWebPage(xgi::Input *in, xgi::Output *out)
void setMemoryPoolPointer(toolbox::mem::Pool *)
const MonitoredQuantity & getEoLSSeenMQ() const
unsigned int dqmEventQueueSize_
void css(xgi::Input *in, xgi::Output *out)
void dqmEventStatisticsWebPage(xgi::Input *in, xgi::Output *out)
void processDQMConsumerEventRequest(xgi::Input *in, xgi::Output *out)
The Signals That Services Can Subscribe To This is based on ActivityRegistry h
Helper function to determine trigger accepts.
void bindConsumerCallbacks()
ConcurrentQueue< EventPtr_t > CommandQueue
QueueCollection< I2OChain > EventQueueCollection
unsigned long totalDataSize() const
QueueCollection< DQMTopLevelFolder::Record > DQMEventQueueCollection
void receiveErrorDataMessage(toolbox::mem::Reference *ref)
void rbsenderWebPage(xgi::Input *in, xgi::Output *out)
void processDQMConsumerRegistrationRequest(xgi::Input *in, xgi::Output *out)
unsigned long dataSize(int fragmentIndex) const
std::string extractParameters(xoap::MessageReference, xdaq::Application *)
void receiveRegistryMessage(toolbox::mem::Reference *ref)
unsigned int commandQueueSize_
void bindWebInterfaceCallbacks()
void defaultWebPage(xgi::Input *in, xgi::Output *out)
void processConsumerEventRequest(xgi::Input *in, xgi::Output *out)
void throughputWebPage(xgi::Input *in, xgi::Output *out)
xoap::MessageReference handleFSMSoapMessage(xoap::MessageReference)
void addDQMEventFragmentSample(const double bytecount)
void reset(double vett[256])
ConcurrentQueue< I2OChain > FragmentQueue
boost::scoped_ptr< ConsumerUtils_t > consumerUtils_
void addEventFragmentSample(const double bytecount)
boost::scoped_ptr< SMWebPageHelper > smWebPageHelper_
void receiveDQMMessage(toolbox::mem::Reference *ref)
void addFragmentSample(const double bytecount)
unsigned int streamQueueSize_
unsigned char * dataLocation(int fragmentIndex) const