00001
00003
00004 #include "EventFilter/StorageManager/interface/DiskWriter.h"
00005 #include "EventFilter/StorageManager/interface/DiskWriterResources.h"
00006 #include "EventFilter/StorageManager/interface/DQMEventProcessor.h"
00007 #include "EventFilter/StorageManager/interface/DQMEventProcessorResources.h"
00008 #include "EventFilter/StorageManager/interface/DiscardManager.h"
00009 #include "EventFilter/StorageManager/interface/Exception.h"
00010 #include "EventFilter/StorageManager/interface/FragmentMonitorCollection.h"
00011 #include "EventFilter/StorageManager/interface/FragmentProcessor.h"
00012 #include "EventFilter/StorageManager/interface/SoapUtils.h"
00013 #include "EventFilter/StorageManager/interface/StorageManager.h"
00014 #include "EventFilter/StorageManager/interface/StateMachine.h"
00015 #include "EventFilter/StorageManager/src/ConsumerUtils.icc"
00016
00017 #include "EventFilter/Utilities/interface/i2oEvfMsgs.h"
00018
00019 #include "FWCore/Utilities/interface/EDMException.h"
00020
00021 #include "interface/shared/version.h"
00022 #include "interface/shared/i2oXFunctionCodes.h"
00023 #include "xcept/tools.h"
00024 #include "xdaq/NamespaceURI.h"
00025 #include "xdata/InfoSpaceFactory.h"
00026 #include "xgi/Method.h"
00027 #include "xoap/Method.h"
00028
00029 #include <boost/lexical_cast.hpp>
00030 #include <boost/shared_ptr.hpp>
00031
00032 #include <cstdlib>
00033
00034 using namespace std;
00035 using namespace stor;
00036
00037
00038 StorageManager::StorageManager(xdaq::ApplicationStub * s) :
00039 xdaq::Application(s)
00040 {
00041 LOG4CPLUS_INFO(this->getApplicationLogger(),"Making StorageManager");
00042
00043
00044 bindI2OCallbacks();
00045 bindStateMachineCallbacks();
00046 bindWebInterfaceCallbacks();
00047 bindConsumerCallbacks();
00048
00049 std::string errorMsg = "Exception in StorageManager constructor: ";
00050 try
00051 {
00052 initializeSharedResources();
00053 }
00054 catch(std::exception &e)
00055 {
00056 errorMsg += e.what();
00057 LOG4CPLUS_FATAL( getApplicationLogger(), e.what() );
00058 XCEPT_RAISE( stor::exception::Exception, e.what() );
00059 }
00060 catch(...)
00061 {
00062 errorMsg += "unknown exception";
00063 LOG4CPLUS_FATAL( getApplicationLogger(), errorMsg );
00064 XCEPT_RAISE( stor::exception::Exception, errorMsg );
00065 }
00066
00067 startWorkerThreads();
00068 }
00069
00070
00071 void StorageManager::bindI2OCallbacks()
00072 {
00073 i2o::bind(this,
00074 &StorageManager::receiveRegistryMessage,
00075 I2O_SM_PREAMBLE,
00076 XDAQ_ORGANIZATION_ID);
00077 i2o::bind(this,
00078 &StorageManager::receiveDataMessage,
00079 I2O_SM_DATA,
00080 XDAQ_ORGANIZATION_ID);
00081 i2o::bind(this,
00082 &StorageManager::receiveErrorDataMessage,
00083 I2O_SM_ERROR,
00084 XDAQ_ORGANIZATION_ID);
00085 i2o::bind(this,
00086 &StorageManager::receiveDQMMessage,
00087 I2O_SM_DQM,
00088 XDAQ_ORGANIZATION_ID);
00089 i2o::bind(this,
00090 &StorageManager::receiveEndOfLumiSectionMessage,
00091 I2O_EVM_LUMISECTION,
00092 XDAQ_ORGANIZATION_ID);
00093 }
00094
00095
00096 void StorageManager::bindStateMachineCallbacks()
00097 {
00098 xoap::bind( this,
00099 &StorageManager::handleFSMSoapMessage,
00100 "Configure",
00101 XDAQ_NS_URI );
00102 xoap::bind( this,
00103 &StorageManager::handleFSMSoapMessage,
00104 "Enable",
00105 XDAQ_NS_URI );
00106 xoap::bind( this,
00107 &StorageManager::handleFSMSoapMessage,
00108 "Stop",
00109 XDAQ_NS_URI );
00110 xoap::bind( this,
00111 &StorageManager::handleFSMSoapMessage,
00112 "Halt",
00113 XDAQ_NS_URI );
00114 xoap::bind( this,
00115 &StorageManager::handleFSMSoapMessage,
00116 "EmergencyStop",
00117 XDAQ_NS_URI );
00118 }
00119
00120
00121 void StorageManager::bindWebInterfaceCallbacks()
00122 {
00123 xgi::bind(this,&StorageManager::css, "styles.css");
00124 xgi::bind(this,&StorageManager::defaultWebPage, "Default");
00125 xgi::bind(this,&StorageManager::inputWebPage, "input");
00126 xgi::bind(this,&StorageManager::storedDataWebPage, "storedData");
00127 xgi::bind(this,&StorageManager::rbsenderWebPage, "rbsenderlist");
00128 xgi::bind(this,&StorageManager::rbsenderDetailWebPage, "rbsenderdetail");
00129 xgi::bind(this,&StorageManager::fileStatisticsWebPage, "fileStatistics");
00130 xgi::bind(this,&StorageManager::dqmEventStatisticsWebPage,"dqmEventStatistics");
00131 xgi::bind(this,&StorageManager::consumerStatisticsPage, "consumerStatistics" );
00132 xgi::bind(this,&StorageManager::consumerListWebPage, "consumerList");
00133 xgi::bind(this,&StorageManager::throughputWebPage, "throughputStatistics");
00134 }
00135
00136
00137 void StorageManager::bindConsumerCallbacks()
00138 {
00139
00140 xgi::bind( this, &StorageManager::processConsumerRegistrationRequest, "registerConsumer" );
00141 xgi::bind( this, &StorageManager::processConsumerHeaderRequest, "getregdata" );
00142 xgi::bind( this, &StorageManager::processConsumerEventRequest, "geteventdata" );
00143
00144
00145 xgi::bind(this,&StorageManager::processDQMConsumerRegistrationRequest, "registerDQMConsumer");
00146 xgi::bind(this,&StorageManager::processDQMConsumerEventRequest, "getDQMeventdata");
00147 }
00148
00149
00150 void StorageManager::initializeSharedResources()
00151 {
00152 sharedResources_.reset(new SharedResources());
00153
00154 xdata::InfoSpace *ispace = getApplicationInfoSpace();
00155 unsigned long instance = getApplicationDescriptor()->getInstance();
00156 sharedResources_->configuration_.reset(new Configuration(ispace, instance));
00157
00158 QueueConfigurationParams queueParams =
00159 sharedResources_->configuration_->getQueueConfigurationParams();
00160 sharedResources_->commandQueue_.
00161 reset(new CommandQueue(queueParams.commandQueueSize_));
00162 sharedResources_->fragmentQueue_.
00163 reset(new FragmentQueue(queueParams.fragmentQueueSize_, queueParams.fragmentQueueMemoryLimitMB_ * 1024*1024));
00164 sharedResources_->registrationQueue_.
00165 reset(new RegistrationQueue(queueParams.registrationQueueSize_));
00166 sharedResources_->streamQueue_.
00167 reset(new StreamQueue(queueParams.streamQueueSize_, queueParams.streamQueueMemoryLimitMB_ * 1024*1024));
00168 sharedResources_->dqmEventQueue_.
00169 reset(new DQMEventQueue(queueParams.dqmEventQueueSize_, queueParams.dqmEventQueueMemoryLimitMB_ * 1024*1024));
00170
00171 sharedResources_->alarmHandler_.reset( new AlarmHandler(this, sharedResources_) );
00172 sharedResources_->statisticsReporter_.reset(
00173 new StatisticsReporter(this, sharedResources_)
00174 );
00175 sharedResources_->initMsgCollection_.reset(new InitMsgCollection());
00176 sharedResources_->diskWriterResources_.reset(new DiskWriterResources());
00177 sharedResources_->dqmEventProcessorResources_.reset(new DQMEventProcessorResources());
00178
00179 sharedResources_->statisticsReporter_->getThroughputMonitorCollection().setFragmentQueue(sharedResources_->fragmentQueue_);
00180 sharedResources_->statisticsReporter_->getThroughputMonitorCollection().setStreamQueue(sharedResources_->streamQueue_);
00181 sharedResources_->statisticsReporter_->getThroughputMonitorCollection().setDQMEventQueue(sharedResources_->dqmEventQueue_);
00182
00183 sharedResources_->
00184 discardManager_.reset(new DiscardManager(getApplicationContext(),
00185 getApplicationDescriptor(),
00186 sharedResources_->statisticsReporter_->
00187 getDataSenderMonitorCollection()));
00188
00189 sharedResources_->registrationCollection_.reset( new RegistrationCollection() );
00190 EventConsumerMonitorCollection& ecmc =
00191 sharedResources_->statisticsReporter_->getEventConsumerMonitorCollection();
00192 sharedResources_->eventQueueCollection_.reset( new EventQueueCollection( ecmc ) );
00193
00194 DQMConsumerMonitorCollection& dcmc =
00195 sharedResources_->statisticsReporter_->getDQMConsumerMonitorCollection();
00196 sharedResources_->dqmEventQueueCollection_.reset( new DQMEventQueueCollection( dcmc ) );
00197
00198 consumerUtils_.reset( new ConsumerUtils_t(
00199 sharedResources_->configuration_,
00200 sharedResources_->registrationCollection_,
00201 sharedResources_->registrationQueue_,
00202 sharedResources_->initMsgCollection_,
00203 sharedResources_->eventQueueCollection_,
00204 sharedResources_->dqmEventQueueCollection_,
00205 sharedResources_->alarmHandler_
00206 ) );
00207
00208 smWebPageHelper_.reset( new SMWebPageHelper(
00209 getApplicationDescriptor(), sharedResources_));
00210
00211 }
00212
00213
00214 void StorageManager::startWorkerThreads()
00215 {
00216
00217
00218 try
00219 {
00220 fragmentProcessor_.reset( new FragmentProcessor( this, sharedResources_ ) );
00221 diskWriter_.reset( new DiskWriter(this, sharedResources_) );
00222 dqmEventProcessor_.reset( new DQMEventProcessor(this, sharedResources_) );
00223 sharedResources_->statisticsReporter_->startWorkLoop("theStatisticsReporter");
00224 fragmentProcessor_->startWorkLoop("theFragmentProcessor");
00225 diskWriter_->startWorkLoop("theDiskWriter");
00226 dqmEventProcessor_->startWorkLoop("theDQMEventProcessor");
00227 }
00228 catch(xcept::Exception &e)
00229 {
00230 sharedResources_->alarmHandler_->moveToFailedState( e );
00231 }
00232 catch(std::exception &e)
00233 {
00234 XCEPT_DECLARE(stor::exception::Exception,
00235 sentinelException, e.what());
00236 sharedResources_->alarmHandler_->moveToFailedState( sentinelException );
00237 }
00238 catch(...)
00239 {
00240 std::string errorMsg = "Unknown exception when starting the workloops";
00241 XCEPT_DECLARE(stor::exception::Exception,
00242 sentinelException, errorMsg);
00243 sharedResources_->alarmHandler_->moveToFailedState( sentinelException );
00244 }
00245 }
00246
00247
00249
00251
00252 void StorageManager::receiveRegistryMessage(toolbox::mem::Reference *ref) throw (i2o::exception::Exception)
00253 {
00254 I2OChain i2oChain(ref);
00255
00256
00257 ThroughputMonitorCollection& throughputMonCollection =
00258 sharedResources_->statisticsReporter_->getThroughputMonitorCollection();
00259 throughputMonCollection.setMemoryPoolPointer( ref->getBuffer()->getPool() );
00260
00261 FragmentMonitorCollection& fragMonCollection =
00262 sharedResources_->statisticsReporter_->getFragmentMonitorCollection();
00263 fragMonCollection.getAllFragmentSizeMQ().addSample(
00264 static_cast<double>( i2oChain.totalDataSize() ) / 0x100000
00265 );
00266
00267 sharedResources_->fragmentQueue_->enqWait(i2oChain);
00268 }
00269
00270
00271 void StorageManager::receiveDataMessage(toolbox::mem::Reference *ref) throw (i2o::exception::Exception)
00272 {
00273 I2OChain i2oChain(ref);
00274
00275 FragmentMonitorCollection& fragMonCollection =
00276 sharedResources_->statisticsReporter_->getFragmentMonitorCollection();
00277 fragMonCollection.addEventFragmentSample( i2oChain.totalDataSize() );
00278
00279 sharedResources_->fragmentQueue_->enqWait(i2oChain);
00280
00281 #ifdef STOR_DEBUG_DUPLICATE_MESSAGES
00282 double r = rand()/static_cast<double>(RAND_MAX);
00283 if (r < 0.001)
00284 {
00285 LOG4CPLUS_INFO(this->getApplicationLogger(), "Simulating duplicated data message");
00286 receiveDataMessage(ref->duplicate());
00287 }
00288 #endif
00289 }
00290
00291
00292 void StorageManager::receiveErrorDataMessage(toolbox::mem::Reference *ref) throw (i2o::exception::Exception)
00293 {
00294 I2OChain i2oChain(ref);
00295
00296 FragmentMonitorCollection& fragMonCollection =
00297 sharedResources_->statisticsReporter_->getFragmentMonitorCollection();
00298 fragMonCollection.addEventFragmentSample( i2oChain.totalDataSize() );
00299
00300 sharedResources_->fragmentQueue_->enqWait(i2oChain);
00301 }
00302
00303
00304 void StorageManager::receiveDQMMessage(toolbox::mem::Reference *ref) throw (i2o::exception::Exception)
00305 {
00306 I2OChain i2oChain(ref);
00307
00308 FragmentMonitorCollection& fragMonCollection =
00309 sharedResources_->statisticsReporter_->getFragmentMonitorCollection();
00310 fragMonCollection.addDQMEventFragmentSample( i2oChain.totalDataSize() );
00311
00312 sharedResources_->fragmentQueue_->enqWait(i2oChain);
00313 }
00314
00315
00316 void StorageManager::receiveEndOfLumiSectionMessage(toolbox::mem::Reference *ref) throw (i2o::exception::Exception)
00317 {
00318 I2OChain i2oChain( ref );
00319
00320 FragmentMonitorCollection& fragMonCollection =
00321 sharedResources_->statisticsReporter_->getFragmentMonitorCollection();
00322 fragMonCollection.addFragmentSample( i2oChain.totalDataSize() );
00323
00324 RunMonitorCollection& runMonCollection =
00325 sharedResources_->statisticsReporter_->getRunMonitorCollection();
00326 runMonCollection.getEoLSSeenMQ().addSample( i2oChain.lumiSection() );
00327
00328 sharedResources_->streamQueue_->enqWait( i2oChain );
00329 }
00330
00331
00333
00335
00336 void StorageManager::css(xgi::Input *in, xgi::Output *out)
00337 throw (xgi::exception::Exception)
00338 {
00339 smWebPageHelper_->css(in,out);
00340 }
00341
00342
00343 void StorageManager::defaultWebPage(xgi::Input *in, xgi::Output *out)
00344 throw (xgi::exception::Exception)
00345 {
00346 std::string errorMsg = "Failed to create the default webpage";
00347
00348 try
00349 {
00350 smWebPageHelper_->defaultWebPage(out);
00351 }
00352 catch(std::exception &e)
00353 {
00354 errorMsg += ": ";
00355 errorMsg += e.what();
00356
00357 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00358 XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00359 }
00360 catch(...)
00361 {
00362 errorMsg += ": Unknown exception";
00363
00364 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00365 XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00366 }
00367 }
00368
00369
00370 void StorageManager::inputWebPage(xgi::Input *in, xgi::Output *out)
00371 throw (xgi::exception::Exception)
00372 {
00373 std::string errorMsg = "Failed to create the I2O input webpage";
00374
00375 try
00376 {
00377 smWebPageHelper_->inputWebPage(out);
00378 }
00379 catch(std::exception &e)
00380 {
00381 errorMsg += ": ";
00382 errorMsg += e.what();
00383
00384 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00385 XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00386 }
00387 catch(...)
00388 {
00389 errorMsg += ": Unknown exception";
00390
00391 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00392 XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00393 }
00394 }
00395
00396
00397 void StorageManager::storedDataWebPage(xgi::Input *in, xgi::Output *out)
00398 throw (xgi::exception::Exception)
00399 {
00400 std::string errorMsg = "Failed to create the stored data webpage";
00401
00402 try
00403 {
00404 smWebPageHelper_->storedDataWebPage(out);
00405 }
00406 catch(std::exception &e)
00407 {
00408 errorMsg += ": ";
00409 errorMsg += e.what();
00410
00411 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00412 XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00413 }
00414 catch(...)
00415 {
00416 errorMsg += ": Unknown exception";
00417
00418 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00419 XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00420 }
00421 }
00422
00423
00424 void StorageManager::consumerStatisticsPage( xgi::Input* in,
00425 xgi::Output* out )
00426 throw( xgi::exception::Exception )
00427 {
00428
00429 std::string err_msg =
00430 "Failed to create consumer statistics page";
00431
00432 try
00433 {
00434 smWebPageHelper_->consumerStatistics(out);
00435 }
00436 catch( std::exception &e )
00437 {
00438 err_msg += ": ";
00439 err_msg += e.what();
00440 LOG4CPLUS_ERROR( getApplicationLogger(), err_msg );
00441 XCEPT_RAISE( xgi::exception::Exception, err_msg );
00442 }
00443 catch(...)
00444 {
00445 err_msg += ": Unknown exception";
00446 LOG4CPLUS_ERROR( getApplicationLogger(), err_msg );
00447 XCEPT_RAISE( xgi::exception::Exception, err_msg );
00448 }
00449
00450 }
00451
00452
00453 void StorageManager::rbsenderWebPage(xgi::Input *in, xgi::Output *out)
00454 throw (xgi::exception::Exception)
00455 {
00456 std::string errorMsg = "Failed to create the data sender webpage";
00457
00458 try
00459 {
00460 smWebPageHelper_->resourceBrokerOverview(out);
00461 }
00462 catch(std::exception &e)
00463 {
00464 errorMsg += ": ";
00465 errorMsg += e.what();
00466
00467 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00468 XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00469 }
00470 catch(...)
00471 {
00472 errorMsg += ": Unknown exception";
00473
00474 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00475 XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00476 }
00477
00478 }
00479
00480
00481 void StorageManager::rbsenderDetailWebPage(xgi::Input *in, xgi::Output *out)
00482 throw (xgi::exception::Exception)
00483 {
00484 std::string errorMsg = "Failed to create the data sender webpage";
00485
00486 try
00487 {
00488 long long localRBID = 0;
00489 cgicc::Cgicc cgiWrapper(in);
00490 cgicc::const_form_iterator updateRef = cgiWrapper.getElement("id");
00491 if (updateRef != cgiWrapper.getElements().end())
00492 {
00493 std::string idString = updateRef->getValue();
00494 localRBID = boost::lexical_cast<long long>(idString);
00495 }
00496
00497 smWebPageHelper_->resourceBrokerDetail(out, localRBID);
00498 }
00499 catch(std::exception &e)
00500 {
00501 errorMsg += ": ";
00502 errorMsg += e.what();
00503
00504 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00505 XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00506 }
00507 catch(...)
00508 {
00509 errorMsg += ": Unknown exception";
00510
00511 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00512 XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00513 }
00514
00515 }
00516
00517
00518 void StorageManager::fileStatisticsWebPage(xgi::Input *in, xgi::Output *out)
00519 throw (xgi::exception::Exception)
00520 {
00521 std::string errorMsg = "Failed to create the file statistics webpage";
00522
00523 try
00524 {
00525 smWebPageHelper_->filesWebPage(out);
00526 }
00527 catch(std::exception &e)
00528 {
00529 errorMsg += ": ";
00530 errorMsg += e.what();
00531
00532 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00533 XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00534 }
00535 catch(...)
00536 {
00537 errorMsg += ": Unknown exception";
00538
00539 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00540 XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00541 }
00542
00543 }
00544
00545
00546 void StorageManager::dqmEventStatisticsWebPage(xgi::Input *in, xgi::Output *out)
00547 throw (xgi::exception::Exception)
00548 {
00549 std::string errorMsg = "Failed to create the DQM event statistics webpage";
00550
00551 try
00552 {
00553 smWebPageHelper_->dqmEventWebPage(out);
00554 }
00555 catch(std::exception &e)
00556 {
00557 errorMsg += ": ";
00558 errorMsg += e.what();
00559
00560 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00561 XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00562 }
00563 catch(...)
00564 {
00565 errorMsg += ": Unknown exception";
00566
00567 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00568 XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00569 }
00570 }
00571
00572
00573 void StorageManager::throughputWebPage(xgi::Input *in, xgi::Output *out)
00574 throw (xgi::exception::Exception)
00575 {
00576 std::string errorMsg = "Failed to create the throughput statistics webpage";
00577
00578 try
00579 {
00580 smWebPageHelper_->throughputWebPage(out);
00581 }
00582 catch(std::exception &e)
00583 {
00584 errorMsg += ": ";
00585 errorMsg += e.what();
00586
00587 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00588 XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00589 }
00590 catch(...)
00591 {
00592 errorMsg += ": Unknown exception";
00593
00594 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00595 XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00596 }
00597
00598 }
00599
00600
00601
00602 void StorageManager::consumerListWebPage(xgi::Input *in, xgi::Output *out)
00603 throw (xgi::exception::Exception)
00604 {
00605 out->getHTTPResponseHeader().addHeader( "Content-Type",
00606 "application/octet-stream" );
00607 out->getHTTPResponseHeader().addHeader( "Content-Transfer-Encoding",
00608 "binary" );
00609 char buff;
00610 out->write( &buff, 0 );
00611 }
00612
00613
00615
00617
00618 xoap::MessageReference StorageManager::handleFSMSoapMessage( xoap::MessageReference msg )
00619 throw( xoap::exception::Exception )
00620 {
00621 std::string errorMsg;
00622 xoap::MessageReference returnMsg;
00623
00624 try {
00625 errorMsg = "Failed to extract FSM event and parameters from SOAP message: ";
00626 std::string command = soaputils::extractParameters(msg, this);
00627
00628 errorMsg = "Failed to put a '" + command + "' state machine event into command queue: ";
00629 if (command == "Configure")
00630 {
00631 sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::Configure() ) );
00632 }
00633 else if (command == "Enable")
00634 {
00635 if (sharedResources_->configuration_->streamConfigurationHasChanged())
00636 {
00637 sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::Reconfigure() ) );
00638 }
00639 sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::Enable() ) );
00640 }
00641 else if (command == "Stop")
00642 {
00643 sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::Stop() ) );
00644 }
00645 else if (command == "Halt")
00646 {
00647 sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::Halt() ) );
00648 }
00649 else if (command == "EmergencyStop")
00650 {
00651 sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::EmergencyStop() ) );
00652 }
00653 else
00654 {
00655 XCEPT_RAISE(stor::exception::StateMachine,
00656 "Received an unknown state machine event '" + command + "'.");
00657 }
00658
00659 errorMsg = "Failed to create FSM SOAP reply message: ";
00660 returnMsg = soaputils::createFsmSoapResponseMsg(command,
00661 sharedResources_->statisticsReporter_->
00662 getStateMachineMonitorCollection().externallyVisibleState());
00663 }
00664 catch (cms::Exception& e) {
00665 errorMsg += e.explainSelf();
00666 XCEPT_DECLARE(xoap::exception::Exception,
00667 sentinelException, errorMsg);
00668 sharedResources_->alarmHandler_->moveToFailedState( sentinelException );
00669 throw sentinelException;
00670 }
00671 catch (xcept::Exception &e) {
00672 XCEPT_DECLARE_NESTED(xoap::exception::Exception,
00673 sentinelException, errorMsg, e);
00674 sharedResources_->alarmHandler_->moveToFailedState( sentinelException );
00675 throw sentinelException;
00676 }
00677 catch (std::exception& e) {
00678 errorMsg += e.what();
00679 XCEPT_DECLARE(xoap::exception::Exception,
00680 sentinelException, errorMsg);
00681 sharedResources_->alarmHandler_->moveToFailedState( sentinelException );
00682 throw sentinelException;
00683 }
00684 catch (...) {
00685 errorMsg += "Unknown exception";
00686 XCEPT_DECLARE(xoap::exception::Exception,
00687 sentinelException, errorMsg);
00688 sharedResources_->alarmHandler_->moveToFailedState( sentinelException );
00689 throw sentinelException;
00690 }
00691
00692 return returnMsg;
00693 }
00694
00695
00699
00700 void
00701 StorageManager::processConsumerRegistrationRequest( xgi::Input* in, xgi::Output* out )
00702 throw( xgi::exception::Exception )
00703 {
00704 consumerUtils_->processConsumerRegistrationRequest(in,out);
00705 }
00706
00707
00708 void
00709 StorageManager::processConsumerHeaderRequest( xgi::Input* in, xgi::Output* out )
00710 throw( xgi::exception::Exception )
00711 {
00712 consumerUtils_->processConsumerHeaderRequest(in,out);
00713 }
00714
00715
00716 void
00717 StorageManager::processConsumerEventRequest( xgi::Input* in, xgi::Output* out )
00718 throw( xgi::exception::Exception )
00719 {
00720 consumerUtils_->processConsumerEventRequest(in,out);
00721 }
00722
00723
00724 void
00725 StorageManager::processDQMConsumerRegistrationRequest( xgi::Input* in, xgi::Output* out )
00726 throw( xgi::exception::Exception )
00727 {
00728 consumerUtils_->processDQMConsumerRegistrationRequest(in,out);
00729 }
00730
00731
00732 void
00733 StorageManager::processDQMConsumerEventRequest( xgi::Input* in, xgi::Output* out )
00734 throw( xgi::exception::Exception )
00735 {
00736 consumerUtils_->processDQMConsumerEventRequest(in,out);
00737 }
00738
00739 namespace stor {
00741
00743 template<>
00744 void
00745 ConsumerUtils<Configuration,EventQueueCollection>::
00746 writeConsumerEvent(xgi::Output* out, const I2OChain& evt) const
00747 {
00748 writeHTTPHeaders( out );
00749
00750 #ifdef STOR_DEBUG_CORRUPTED_EVENT_HEADER
00751 double r = rand()/static_cast<double>(RAND_MAX);
00752 if (r < 0.1)
00753 {
00754 std::cout << "Simulating corrupted event header" << std::endl;
00755 EventHeader* h = (EventHeader*)evt.dataLocation(0);
00756 h->protocolVersion_ = 1;
00757 }
00758 #endif // STOR_DEBUG_CORRUPTED_EVENT_HEADER
00759
00760 const unsigned int nfrags = evt.fragmentCount();
00761 for ( unsigned int i = 0; i < nfrags; ++i )
00762 {
00763 const unsigned long len = evt.dataSize( i );
00764 unsigned char* location = evt.dataLocation( i );
00765 out->write( (char*)location, len );
00766 }
00767 }
00768 }
00769
00770
00772
00774
00775 XDAQ_INSTANTIATE(StorageManager)
00776
00777
00778
00779
00780
00781
00782
00783
00784
00785
00786
00787
00788
00789