15 #include "EventFilter/StorageManager/src/ConsumerUtils.icc"
21 #include "interface/shared/version.h"
22 #include "interface/shared/i2oXFunctionCodes.h"
23 #include "xcept/tools.h"
24 #include "xdaq/NamespaceURI.h"
25 #include "xdata/InfoSpaceFactory.h"
26 #include "xgi/Method.h"
27 #include "xoap/Method.h"
29 #include <boost/lexical_cast.hpp>
30 #include <boost/shared_ptr.hpp>
38 StorageManager::StorageManager(xdaq::ApplicationStub *
s) :
41 LOG4CPLUS_INFO(this->getApplicationLogger(),
"Making StorageManager");
49 std::string errorMsg =
"Exception in StorageManager constructor: ";
57 LOG4CPLUS_FATAL( getApplicationLogger(), e.what() );
62 errorMsg +=
"unknown exception";
63 LOG4CPLUS_FATAL( getApplicationLogger(), errorMsg );
76 XDAQ_ORGANIZATION_ID);
80 XDAQ_ORGANIZATION_ID);
84 XDAQ_ORGANIZATION_ID);
88 XDAQ_ORGANIZATION_ID);
92 XDAQ_ORGANIZATION_ID);
154 xdata::InfoSpace *ispace = getApplicationInfoSpace();
155 unsigned long instance = getApplicationDescriptor()->getInstance();
185 getApplicationDescriptor(),
187 getDataSenderMonitorCollection()));
223 sharedResources_->statisticsReporter_->startWorkLoop(
"theStatisticsReporter");
235 sentinelException, e.what());
240 std::string errorMsg =
"Unknown exception when starting the workloops";
242 sentinelException, errorMsg);
258 sharedResources_->statisticsReporter_->getThroughputMonitorCollection();
262 sharedResources_->statisticsReporter_->getFragmentMonitorCollection();
267 sharedResources_->fragmentQueue_->enqWait(i2oChain);
276 sharedResources_->statisticsReporter_->getFragmentMonitorCollection();
279 sharedResources_->fragmentQueue_->enqWait(i2oChain);
281 #ifdef STOR_DEBUG_DUPLICATE_MESSAGES
282 double r =
rand()/
static_cast<double>(RAND_MAX);
285 LOG4CPLUS_INFO(this->getApplicationLogger(),
"Simulating duplicated data message");
286 receiveDataMessage(ref->duplicate());
297 sharedResources_->statisticsReporter_->getFragmentMonitorCollection();
300 sharedResources_->fragmentQueue_->enqWait(i2oChain);
309 sharedResources_->statisticsReporter_->getFragmentMonitorCollection();
312 sharedResources_->fragmentQueue_->enqWait(i2oChain);
321 sharedResources_->statisticsReporter_->getFragmentMonitorCollection();
325 sharedResources_->statisticsReporter_->getRunMonitorCollection();
328 sharedResources_->streamQueue_->enqWait( i2oChain );
339 smWebPageHelper_->css(
in,
out);
346 std::string errorMsg =
"Failed to create the default webpage";
350 smWebPageHelper_->defaultWebPage(
out);
355 errorMsg += e.what();
357 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
362 errorMsg +=
": Unknown exception";
364 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
373 std::string errorMsg =
"Failed to create the I2O input webpage";
377 smWebPageHelper_->inputWebPage(
out);
382 errorMsg += e.what();
384 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
389 errorMsg +=
": Unknown exception";
391 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
400 std::string errorMsg =
"Failed to create the stored data webpage";
404 smWebPageHelper_->storedDataWebPage(
out);
409 errorMsg += e.what();
411 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
416 errorMsg +=
": Unknown exception";
418 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
430 "Failed to create consumer statistics page";
434 smWebPageHelper_->consumerStatistics(
out);
440 LOG4CPLUS_ERROR( getApplicationLogger(), err_msg );
445 err_msg +=
": Unknown exception";
446 LOG4CPLUS_ERROR( getApplicationLogger(), err_msg );
456 std::string errorMsg =
"Failed to create the data sender webpage";
460 smWebPageHelper_->resourceBrokerOverview(
out);
465 errorMsg += e.what();
467 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
472 errorMsg +=
": Unknown exception";
474 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
484 std::string errorMsg =
"Failed to create the data sender webpage";
488 long long localRBID = 0;
489 cgicc::Cgicc cgiWrapper(
in);
490 cgicc::const_form_iterator updateRef = cgiWrapper.getElement(
"id");
491 if (updateRef != cgiWrapper.getElements().end())
494 localRBID = boost::lexical_cast<
long long>(idString);
497 smWebPageHelper_->resourceBrokerDetail(
out, localRBID);
502 errorMsg += e.what();
504 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
509 errorMsg +=
": Unknown exception";
511 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
521 std::string errorMsg =
"Failed to create the file statistics webpage";
525 smWebPageHelper_->filesWebPage(
out);
530 errorMsg += e.what();
532 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
537 errorMsg +=
": Unknown exception";
539 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
549 std::string errorMsg =
"Failed to create the DQM event statistics webpage";
553 smWebPageHelper_->dqmEventWebPage(
out);
558 errorMsg += e.what();
560 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
565 errorMsg +=
": Unknown exception";
567 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
576 std::string errorMsg =
"Failed to create the throughput statistics webpage";
580 smWebPageHelper_->throughputWebPage(
out);
585 errorMsg += e.what();
587 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
592 errorMsg +=
": Unknown exception";
594 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
605 out->getHTTPResponseHeader().addHeader(
"Content-Type",
606 "application/octet-stream" );
607 out->getHTTPResponseHeader().addHeader(
"Content-Transfer-Encoding",
610 out->write( &buff, 0 );
622 xoap::MessageReference returnMsg;
625 errorMsg =
"Failed to extract FSM event and parameters from SOAP message: ";
628 errorMsg =
"Failed to put a '" + command +
"' state machine event into command queue: ";
629 if (command ==
"Configure")
633 else if (command ==
"Enable")
635 if (sharedResources_->configuration_->streamConfigurationHasChanged())
641 else if (command ==
"Stop")
645 else if (command ==
"Halt")
649 else if (command ==
"EmergencyStop")
655 XCEPT_RAISE(stor::exception::StateMachine,
656 "Received an unknown state machine event '" + command +
"'.");
659 errorMsg =
"Failed to create FSM SOAP reply message: ";
661 sharedResources_->statisticsReporter_->
662 getStateMachineMonitorCollection().externallyVisibleState());
667 sentinelException, errorMsg);
668 sharedResources_->alarmHandler_->moveToFailedState( sentinelException );
669 throw sentinelException;
673 sentinelException, errorMsg, e);
674 sharedResources_->alarmHandler_->moveToFailedState( sentinelException );
675 throw sentinelException;
678 errorMsg += e.what();
680 sentinelException, errorMsg);
681 sharedResources_->alarmHandler_->moveToFailedState( sentinelException );
682 throw sentinelException;
685 errorMsg +=
"Unknown exception";
687 sentinelException, errorMsg);
688 sharedResources_->alarmHandler_->moveToFailedState( sentinelException );
689 throw sentinelException;
704 consumerUtils_->processConsumerRegistrationRequest(
in,
out);
712 consumerUtils_->processConsumerHeaderRequest(
in,
out);
720 consumerUtils_->processConsumerEventRequest(
in,
out);
728 consumerUtils_->processDQMConsumerRegistrationRequest(
in,
out);
736 consumerUtils_->processDQMConsumerEventRequest(
in,
out);
748 writeHTTPHeaders( out );
750 #ifdef STOR_DEBUG_CORRUPTED_EVENT_HEADER
751 double r =
rand()/
static_cast<double>(RAND_MAX);
754 std::cout <<
"Simulating corrupted event header" << std::endl;
758 #endif // STOR_DEBUG_CORRUPTED_EVENT_HEADER
761 for (
unsigned int i = 0;
i < nfrags; ++
i )
763 const unsigned long len = evt.
dataSize(
i );
765 out->write( (
char*)location, len );
784 const MonitoredQuantity & getAllFragmentSizeMQ() const
void processConsumerRegistrationRequest(xgi::Input *in, xgi::Output *out)
unsigned int fragmentCount() const
unsigned int dqmEventQueueMemoryLimitMB_
boost::scoped_ptr< DQMEventProcessor > dqmEventProcessor_
void receiveEndOfLumiSectionMessage(toolbox::mem::Reference *ref)
boost::scoped_ptr< DiskWriter > diskWriter_
static PFTauRenderPlugin instance
void inputWebPage(xgi::Input *in, xgi::Output *out)
virtual std::string explainSelf() const
void receiveDataMessage(toolbox::mem::Reference *ref)
void addSample(const double &value=1)
ConsumerUtils< Configuration, EventQueueCollection > ConsumerUtils_t
unsigned int fragmentQueueMemoryLimitMB_
ConcurrentQueue< RegPtr > RegistrationQueue
void consumerStatisticsPage(xgi::Input *in, xgi::Output *out)
unsigned int streamQueueMemoryLimitMB_
unsigned int fragmentQueueSize_
SharedResourcesPtr sharedResources_
boost::shared_ptr< boost::statechart::event_base > EventPtr_t
boost::scoped_ptr< FragmentProcessor > fragmentProcessor_
void writeConsumerEvent(xgi::Output *, const stor::I2OChain &) const
void startWorkerThreads()
void consumerListWebPage(xgi::Input *in, xgi::Output *out)
void bindStateMachineCallbacks()
xoap::MessageReference createFsmSoapResponseMsg(const std::string commandName, const std::string currentState)
ConcurrentQueue< I2OChain, KeepNewest< I2OChain > > DQMEventQueue
ConcurrentQueue< I2OChain > StreamQueue
void processConsumerHeaderRequest(xgi::Input *in, xgi::Output *out)
uint32_t lumiSection() const
unsigned int registrationQueueSize_
void fileStatisticsWebPage(xgi::Input *in, xgi::Output *out)
void rbsenderDetailWebPage(xgi::Input *in, xgi::Output *out)
void initializeSharedResources()
void storedDataWebPage(xgi::Input *in, xgi::Output *out)
void setMemoryPoolPointer(toolbox::mem::Pool *)
const MonitoredQuantity & getEoLSSeenMQ() const
unsigned int dqmEventQueueSize_
void css(xgi::Input *in, xgi::Output *out)
void dqmEventStatisticsWebPage(xgi::Input *in, xgi::Output *out)
void processDQMConsumerEventRequest(xgi::Input *in, xgi::Output *out)
The Signals That Services Can Subscribe To This is based on ActivityRegistry h
Helper function to determine trigger accepts.
void bindConsumerCallbacks()
ConcurrentQueue< EventPtr_t > CommandQueue
QueueCollection< I2OChain > EventQueueCollection
unsigned long totalDataSize() const
QueueCollection< DQMTopLevelFolder::Record > DQMEventQueueCollection
void receiveErrorDataMessage(toolbox::mem::Reference *ref)
void rbsenderWebPage(xgi::Input *in, xgi::Output *out)
void processDQMConsumerRegistrationRequest(xgi::Input *in, xgi::Output *out)
unsigned long dataSize(int fragmentIndex) const
std::string extractParameters(xoap::MessageReference, xdaq::Application *)
void receiveRegistryMessage(toolbox::mem::Reference *ref)
unsigned int commandQueueSize_
void bindWebInterfaceCallbacks()
void defaultWebPage(xgi::Input *in, xgi::Output *out)
void processConsumerEventRequest(xgi::Input *in, xgi::Output *out)
void throughputWebPage(xgi::Input *in, xgi::Output *out)
xoap::MessageReference handleFSMSoapMessage(xoap::MessageReference)
void addDQMEventFragmentSample(const double bytecount)
void reset(double vett[256])
ConcurrentQueue< I2OChain > FragmentQueue
boost::scoped_ptr< ConsumerUtils_t > consumerUtils_
void addEventFragmentSample(const double bytecount)
boost::scoped_ptr< SMWebPageHelper > smWebPageHelper_
void receiveDQMMessage(toolbox::mem::Reference *ref)
void addFragmentSample(const double bytecount)
unsigned int streamQueueSize_
unsigned char * dataLocation(int fragmentIndex) const