00001
00003
00004 #include "EventFilter/StorageManager/interface/DiskWriter.h"
00005 #include "EventFilter/StorageManager/interface/DiskWriterResources.h"
00006 #include "EventFilter/StorageManager/interface/DQMEventProcessor.h"
00007 #include "EventFilter/StorageManager/interface/DQMEventProcessorResources.h"
00008 #include "EventFilter/StorageManager/interface/DiscardManager.h"
00009 #include "EventFilter/StorageManager/interface/Exception.h"
00010 #include "EventFilter/StorageManager/interface/FragmentMonitorCollection.h"
00011 #include "EventFilter/StorageManager/interface/FragmentProcessor.h"
00012 #include "EventFilter/StorageManager/interface/SoapUtils.h"
00013 #include "EventFilter/StorageManager/interface/StorageManager.h"
00014 #include "EventFilter/StorageManager/interface/StateMachine.h"
00015 #include "EventFilter/StorageManager/src/ConsumerUtils.icc"
00016
00017 #include "EventFilter/Utilities/interface/i2oEvfMsgs.h"
00018
00019 #include "FWCore/Utilities/interface/EDMException.h"
00020
00021 #include "i2o/Method.h"
00022 #include "interface/shared/version.h"
00023 #include "interface/shared/i2oXFunctionCodes.h"
00024 #include "xcept/tools.h"
00025 #include "xdaq/NamespaceURI.h"
00026 #include "xdata/InfoSpaceFactory.h"
00027 #include "xgi/Method.h"
00028 #include "xoap/Method.h"
00029
00030 #include <boost/lexical_cast.hpp>
00031 #include <boost/shared_ptr.hpp>
00032
00033 #include <cstdlib>
00034
00035 using namespace std;
00036 using namespace stor;
00037
00038
00039 StorageManager::StorageManager(xdaq::ApplicationStub * s) :
00040 xdaq::Application(s)
00041 {
00042 LOG4CPLUS_INFO(this->getApplicationLogger(),"Making StorageManager");
00043
00044
00045 bindI2OCallbacks();
00046 bindStateMachineCallbacks();
00047 bindWebInterfaceCallbacks();
00048 bindConsumerCallbacks();
00049
00050 std::string errorMsg = "Exception in StorageManager constructor: ";
00051 try
00052 {
00053 initializeSharedResources();
00054 }
00055 catch(std::exception &e)
00056 {
00057 errorMsg += e.what();
00058 LOG4CPLUS_FATAL( getApplicationLogger(), e.what() );
00059 XCEPT_RAISE( stor::exception::Exception, e.what() );
00060 }
00061 catch(...)
00062 {
00063 errorMsg += "unknown exception";
00064 LOG4CPLUS_FATAL( getApplicationLogger(), errorMsg );
00065 XCEPT_RAISE( stor::exception::Exception, errorMsg );
00066 }
00067
00068 startWorkerThreads();
00069 }
00070
00071
00072 void StorageManager::bindI2OCallbacks()
00073 {
00074 i2o::bind(this,
00075 &StorageManager::receiveRegistryMessage,
00076 I2O_SM_PREAMBLE,
00077 XDAQ_ORGANIZATION_ID);
00078 i2o::bind(this,
00079 &StorageManager::receiveDataMessage,
00080 I2O_SM_DATA,
00081 XDAQ_ORGANIZATION_ID);
00082 i2o::bind(this,
00083 &StorageManager::receiveErrorDataMessage,
00084 I2O_SM_ERROR,
00085 XDAQ_ORGANIZATION_ID);
00086 i2o::bind(this,
00087 &StorageManager::receiveDQMMessage,
00088 I2O_SM_DQM,
00089 XDAQ_ORGANIZATION_ID);
00090 i2o::bind(this,
00091 &StorageManager::receiveEndOfLumiSectionMessage,
00092 I2O_EVM_LUMISECTION,
00093 XDAQ_ORGANIZATION_ID);
00094 }
00095
00096
00097 void StorageManager::bindStateMachineCallbacks()
00098 {
00099 xoap::bind( this,
00100 &StorageManager::handleFSMSoapMessage,
00101 "Configure",
00102 XDAQ_NS_URI );
00103 xoap::bind( this,
00104 &StorageManager::handleFSMSoapMessage,
00105 "Enable",
00106 XDAQ_NS_URI );
00107 xoap::bind( this,
00108 &StorageManager::handleFSMSoapMessage,
00109 "Stop",
00110 XDAQ_NS_URI );
00111 xoap::bind( this,
00112 &StorageManager::handleFSMSoapMessage,
00113 "Halt",
00114 XDAQ_NS_URI );
00115 xoap::bind( this,
00116 &StorageManager::handleFSMSoapMessage,
00117 "EmergencyStop",
00118 XDAQ_NS_URI );
00119 }
00120
00121
00122 void StorageManager::bindWebInterfaceCallbacks()
00123 {
00124 xgi::bind(this,&StorageManager::css, "styles.css");
00125 xgi::bind(this,&StorageManager::defaultWebPage, "Default");
00126 xgi::bind(this,&StorageManager::inputWebPage, "input");
00127 xgi::bind(this,&StorageManager::storedDataWebPage, "storedData");
00128 xgi::bind(this,&StorageManager::rbsenderWebPage, "rbsenderlist");
00129 xgi::bind(this,&StorageManager::rbsenderDetailWebPage, "rbsenderdetail");
00130 xgi::bind(this,&StorageManager::fileStatisticsWebPage, "fileStatistics");
00131 xgi::bind(this,&StorageManager::dqmEventStatisticsWebPage,"dqmEventStatistics");
00132 xgi::bind(this,&StorageManager::consumerStatisticsPage, "consumerStatistics" );
00133 xgi::bind(this,&StorageManager::consumerListWebPage, "consumerList");
00134 xgi::bind(this,&StorageManager::throughputWebPage, "throughputStatistics");
00135 }
00136
00137
00138 void StorageManager::bindConsumerCallbacks()
00139 {
00140
00141 xgi::bind( this, &StorageManager::processConsumerRegistrationRequest, "registerConsumer" );
00142 xgi::bind( this, &StorageManager::processConsumerHeaderRequest, "getregdata" );
00143 xgi::bind( this, &StorageManager::processConsumerEventRequest, "geteventdata" );
00144
00145
00146 xgi::bind(this,&StorageManager::processDQMConsumerRegistrationRequest, "registerDQMConsumer");
00147 xgi::bind(this,&StorageManager::processDQMConsumerEventRequest, "getDQMeventdata");
00148 }
00149
00150
00151 void StorageManager::initializeSharedResources()
00152 {
00153 sharedResources_.reset(new SharedResources());
00154
00155 xdata::InfoSpace *ispace = getApplicationInfoSpace();
00156 unsigned long instance = getApplicationDescriptor()->getInstance();
00157 sharedResources_->configuration_.reset(new Configuration(ispace, instance));
00158
00159 QueueConfigurationParams queueParams =
00160 sharedResources_->configuration_->getQueueConfigurationParams();
00161 sharedResources_->commandQueue_.
00162 reset(new CommandQueue(queueParams.commandQueueSize_));
00163 sharedResources_->fragmentQueue_.
00164 reset(new FragmentQueue(queueParams.fragmentQueueSize_, queueParams.fragmentQueueMemoryLimitMB_ * 1024*1024));
00165 sharedResources_->registrationQueue_.
00166 reset(new RegistrationQueue(queueParams.registrationQueueSize_));
00167 sharedResources_->streamQueue_.
00168 reset(new StreamQueue(queueParams.streamQueueSize_, queueParams.streamQueueMemoryLimitMB_ * 1024*1024));
00169 sharedResources_->dqmEventQueue_.
00170 reset(new DQMEventQueue(queueParams.dqmEventQueueSize_, queueParams.dqmEventQueueMemoryLimitMB_ * 1024*1024));
00171
00172 sharedResources_->alarmHandler_.reset( new AlarmHandler(this, sharedResources_) );
00173 sharedResources_->statisticsReporter_.reset(
00174 new StatisticsReporter(this, sharedResources_)
00175 );
00176 sharedResources_->initMsgCollection_.reset(new InitMsgCollection());
00177 sharedResources_->diskWriterResources_.reset(new DiskWriterResources());
00178 sharedResources_->dqmEventProcessorResources_.reset(new DQMEventProcessorResources());
00179
00180 sharedResources_->statisticsReporter_->getThroughputMonitorCollection().setFragmentQueue(sharedResources_->fragmentQueue_);
00181 sharedResources_->statisticsReporter_->getThroughputMonitorCollection().setStreamQueue(sharedResources_->streamQueue_);
00182 sharedResources_->statisticsReporter_->getThroughputMonitorCollection().setDQMEventQueue(sharedResources_->dqmEventQueue_);
00183
00184 sharedResources_->
00185 discardManager_.reset(new DiscardManager(getApplicationContext(),
00186 getApplicationDescriptor(),
00187 sharedResources_->statisticsReporter_->
00188 getDataSenderMonitorCollection()));
00189
00190 sharedResources_->registrationCollection_.reset( new RegistrationCollection() );
00191 EventConsumerMonitorCollection& ecmc =
00192 sharedResources_->statisticsReporter_->getEventConsumerMonitorCollection();
00193 sharedResources_->eventQueueCollection_.reset( new EventQueueCollection( ecmc ) );
00194
00195 DQMConsumerMonitorCollection& dcmc =
00196 sharedResources_->statisticsReporter_->getDQMConsumerMonitorCollection();
00197 sharedResources_->dqmEventQueueCollection_.reset( new DQMEventQueueCollection( dcmc ) );
00198
00199 consumerUtils_.reset( new ConsumerUtils_t(
00200 sharedResources_->configuration_,
00201 sharedResources_->registrationCollection_,
00202 sharedResources_->registrationQueue_,
00203 sharedResources_->initMsgCollection_,
00204 sharedResources_->eventQueueCollection_,
00205 sharedResources_->dqmEventQueueCollection_,
00206 sharedResources_->alarmHandler_
00207 ) );
00208
00209 smWebPageHelper_.reset( new SMWebPageHelper(
00210 getApplicationDescriptor(), sharedResources_));
00211
00212 }
00213
00214
00215 void StorageManager::startWorkerThreads()
00216 {
00217
00218
00219 try
00220 {
00221 fragmentProcessor_.reset( new FragmentProcessor( this, sharedResources_ ) );
00222 diskWriter_.reset( new DiskWriter(this, sharedResources_) );
00223 dqmEventProcessor_.reset( new DQMEventProcessor(this, sharedResources_) );
00224 sharedResources_->statisticsReporter_->startWorkLoop("theStatisticsReporter");
00225 fragmentProcessor_->startWorkLoop("theFragmentProcessor");
00226 diskWriter_->startWorkLoop("theDiskWriter");
00227 dqmEventProcessor_->startWorkLoop("theDQMEventProcessor");
00228 }
00229 catch(xcept::Exception &e)
00230 {
00231 sharedResources_->alarmHandler_->moveToFailedState( e );
00232 }
00233 catch(std::exception &e)
00234 {
00235 XCEPT_DECLARE(stor::exception::Exception,
00236 sentinelException, e.what());
00237 sharedResources_->alarmHandler_->moveToFailedState( sentinelException );
00238 }
00239 catch(...)
00240 {
00241 std::string errorMsg = "Unknown exception when starting the workloops";
00242 XCEPT_DECLARE(stor::exception::Exception,
00243 sentinelException, errorMsg);
00244 sharedResources_->alarmHandler_->moveToFailedState( sentinelException );
00245 }
00246 }
00247
00248
00250
00252
00253 void StorageManager::receiveRegistryMessage(toolbox::mem::Reference *ref)
00254 {
00255 I2OChain i2oChain(ref);
00256
00257
00258 ThroughputMonitorCollection& throughputMonCollection =
00259 sharedResources_->statisticsReporter_->getThroughputMonitorCollection();
00260 throughputMonCollection.setMemoryPoolPointer( ref->getBuffer()->getPool() );
00261
00262 FragmentMonitorCollection& fragMonCollection =
00263 sharedResources_->statisticsReporter_->getFragmentMonitorCollection();
00264 fragMonCollection.getAllFragmentSizeMQ().addSample(
00265 static_cast<double>( i2oChain.totalDataSize() ) / 0x100000
00266 );
00267
00268 sharedResources_->fragmentQueue_->enqWait(i2oChain);
00269 }
00270
00271
00272 void StorageManager::receiveDataMessage(toolbox::mem::Reference *ref)
00273 {
00274 I2OChain i2oChain(ref);
00275
00276 FragmentMonitorCollection& fragMonCollection =
00277 sharedResources_->statisticsReporter_->getFragmentMonitorCollection();
00278 fragMonCollection.addEventFragmentSample( i2oChain.totalDataSize() );
00279
00280 sharedResources_->fragmentQueue_->enqWait(i2oChain);
00281
00282 #ifdef STOR_DEBUG_DUPLICATE_MESSAGES
00283 double r = rand()/static_cast<double>(RAND_MAX);
00284 if (r < 0.001)
00285 {
00286 LOG4CPLUS_INFO(this->getApplicationLogger(), "Simulating duplicated data message");
00287 receiveDataMessage(ref->duplicate());
00288 }
00289 #endif
00290 }
00291
00292
00293 void StorageManager::receiveErrorDataMessage(toolbox::mem::Reference *ref)
00294 {
00295 I2OChain i2oChain(ref);
00296
00297 FragmentMonitorCollection& fragMonCollection =
00298 sharedResources_->statisticsReporter_->getFragmentMonitorCollection();
00299 fragMonCollection.addEventFragmentSample( i2oChain.totalDataSize() );
00300
00301 sharedResources_->fragmentQueue_->enqWait(i2oChain);
00302 }
00303
00304
00305 void StorageManager::receiveDQMMessage(toolbox::mem::Reference *ref)
00306 {
00307 I2OChain i2oChain(ref);
00308
00309 FragmentMonitorCollection& fragMonCollection =
00310 sharedResources_->statisticsReporter_->getFragmentMonitorCollection();
00311 fragMonCollection.addDQMEventFragmentSample( i2oChain.totalDataSize() );
00312
00313 sharedResources_->fragmentQueue_->enqWait(i2oChain);
00314 }
00315
00316
00317 void StorageManager::receiveEndOfLumiSectionMessage(toolbox::mem::Reference *ref)
00318 {
00319 I2OChain i2oChain( ref );
00320
00321 FragmentMonitorCollection& fragMonCollection =
00322 sharedResources_->statisticsReporter_->getFragmentMonitorCollection();
00323 fragMonCollection.addFragmentSample( i2oChain.totalDataSize() );
00324
00325 RunMonitorCollection& runMonCollection =
00326 sharedResources_->statisticsReporter_->getRunMonitorCollection();
00327 runMonCollection.getEoLSSeenMQ().addSample( i2oChain.lumiSection() );
00328
00329 sharedResources_->streamQueue_->enqWait( i2oChain );
00330 }
00331
00332
00334
00336
00337 void StorageManager::css(xgi::Input *in, xgi::Output *out)
00338 throw (xgi::exception::Exception)
00339 {
00340 smWebPageHelper_->css(in,out);
00341 }
00342
00343
00344 void StorageManager::defaultWebPage(xgi::Input *in, xgi::Output *out)
00345 throw (xgi::exception::Exception)
00346 {
00347 std::string errorMsg = "Failed to create the default webpage";
00348
00349 try
00350 {
00351 smWebPageHelper_->defaultWebPage(out);
00352 }
00353 catch(std::exception &e)
00354 {
00355 errorMsg += ": ";
00356 errorMsg += e.what();
00357
00358 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00359 XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00360 }
00361 catch(...)
00362 {
00363 errorMsg += ": Unknown exception";
00364
00365 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00366 XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00367 }
00368 }
00369
00370
00371 void StorageManager::inputWebPage(xgi::Input *in, xgi::Output *out)
00372 throw (xgi::exception::Exception)
00373 {
00374 std::string errorMsg = "Failed to create the I2O input webpage";
00375
00376 try
00377 {
00378 smWebPageHelper_->inputWebPage(out);
00379 }
00380 catch(std::exception &e)
00381 {
00382 errorMsg += ": ";
00383 errorMsg += e.what();
00384
00385 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00386 XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00387 }
00388 catch(...)
00389 {
00390 errorMsg += ": Unknown exception";
00391
00392 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00393 XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00394 }
00395 }
00396
00397
00398 void StorageManager::storedDataWebPage(xgi::Input *in, xgi::Output *out)
00399 throw (xgi::exception::Exception)
00400 {
00401 std::string errorMsg = "Failed to create the stored data webpage";
00402
00403 try
00404 {
00405 smWebPageHelper_->storedDataWebPage(out);
00406 }
00407 catch(std::exception &e)
00408 {
00409 errorMsg += ": ";
00410 errorMsg += e.what();
00411
00412 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00413 XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00414 }
00415 catch(...)
00416 {
00417 errorMsg += ": Unknown exception";
00418
00419 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00420 XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00421 }
00422 }
00423
00424
00425 void StorageManager::consumerStatisticsPage( xgi::Input* in,
00426 xgi::Output* out )
00427 throw( xgi::exception::Exception )
00428 {
00429
00430 std::string err_msg =
00431 "Failed to create consumer statistics page";
00432
00433 try
00434 {
00435 smWebPageHelper_->consumerStatistics(out);
00436 }
00437 catch( std::exception &e )
00438 {
00439 err_msg += ": ";
00440 err_msg += e.what();
00441 LOG4CPLUS_ERROR( getApplicationLogger(), err_msg );
00442 XCEPT_RAISE( xgi::exception::Exception, err_msg );
00443 }
00444 catch(...)
00445 {
00446 err_msg += ": Unknown exception";
00447 LOG4CPLUS_ERROR( getApplicationLogger(), err_msg );
00448 XCEPT_RAISE( xgi::exception::Exception, err_msg );
00449 }
00450
00451 }
00452
00453
00454 void StorageManager::rbsenderWebPage(xgi::Input *in, xgi::Output *out)
00455 throw (xgi::exception::Exception)
00456 {
00457 std::string errorMsg = "Failed to create the data sender webpage";
00458
00459 try
00460 {
00461 smWebPageHelper_->resourceBrokerOverview(out);
00462 }
00463 catch(std::exception &e)
00464 {
00465 errorMsg += ": ";
00466 errorMsg += e.what();
00467
00468 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00469 XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00470 }
00471 catch(...)
00472 {
00473 errorMsg += ": Unknown exception";
00474
00475 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00476 XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00477 }
00478
00479 }
00480
00481
00482 void StorageManager::rbsenderDetailWebPage(xgi::Input *in, xgi::Output *out)
00483 throw (xgi::exception::Exception)
00484 {
00485 std::string errorMsg = "Failed to create the data sender webpage";
00486
00487 try
00488 {
00489 long long localRBID = 0;
00490 cgicc::Cgicc cgiWrapper(in);
00491 cgicc::const_form_iterator updateRef = cgiWrapper.getElement("id");
00492 if (updateRef != cgiWrapper.getElements().end())
00493 {
00494 std::string idString = updateRef->getValue();
00495 localRBID = boost::lexical_cast<long long>(idString);
00496 }
00497
00498 smWebPageHelper_->resourceBrokerDetail(out, localRBID);
00499 }
00500 catch(std::exception &e)
00501 {
00502 errorMsg += ": ";
00503 errorMsg += e.what();
00504
00505 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00506 XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00507 }
00508 catch(...)
00509 {
00510 errorMsg += ": Unknown exception";
00511
00512 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00513 XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00514 }
00515
00516 }
00517
00518
00519 void StorageManager::fileStatisticsWebPage(xgi::Input *in, xgi::Output *out)
00520 throw (xgi::exception::Exception)
00521 {
00522 std::string errorMsg = "Failed to create the file statistics webpage";
00523
00524 try
00525 {
00526 smWebPageHelper_->filesWebPage(out);
00527 }
00528 catch(std::exception &e)
00529 {
00530 errorMsg += ": ";
00531 errorMsg += e.what();
00532
00533 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00534 XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00535 }
00536 catch(...)
00537 {
00538 errorMsg += ": Unknown exception";
00539
00540 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00541 XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00542 }
00543
00544 }
00545
00546
00547 void StorageManager::dqmEventStatisticsWebPage(xgi::Input *in, xgi::Output *out)
00548 throw (xgi::exception::Exception)
00549 {
00550 std::string errorMsg = "Failed to create the DQM event statistics webpage";
00551
00552 try
00553 {
00554 smWebPageHelper_->dqmEventWebPage(out);
00555 }
00556 catch(std::exception &e)
00557 {
00558 errorMsg += ": ";
00559 errorMsg += e.what();
00560
00561 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00562 XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00563 }
00564 catch(...)
00565 {
00566 errorMsg += ": Unknown exception";
00567
00568 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00569 XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00570 }
00571 }
00572
00573
00574 void StorageManager::throughputWebPage(xgi::Input *in, xgi::Output *out)
00575 throw (xgi::exception::Exception)
00576 {
00577 std::string errorMsg = "Failed to create the throughput statistics webpage";
00578
00579 try
00580 {
00581 smWebPageHelper_->throughputWebPage(out);
00582 }
00583 catch(std::exception &e)
00584 {
00585 errorMsg += ": ";
00586 errorMsg += e.what();
00587
00588 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00589 XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00590 }
00591 catch(...)
00592 {
00593 errorMsg += ": Unknown exception";
00594
00595 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00596 XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00597 }
00598
00599 }
00600
00601
00602
00603 void StorageManager::consumerListWebPage(xgi::Input *in, xgi::Output *out)
00604 throw (xgi::exception::Exception)
00605 {
00606 out->getHTTPResponseHeader().addHeader( "Content-Type",
00607 "application/octet-stream" );
00608 out->getHTTPResponseHeader().addHeader( "Content-Transfer-Encoding",
00609 "binary" );
00610 char buff;
00611 out->write( &buff, 0 );
00612 }
00613
00614
00616
00618
00619 xoap::MessageReference StorageManager::handleFSMSoapMessage( xoap::MessageReference msg )
00620 throw( xoap::exception::Exception )
00621 {
00622 std::string errorMsg;
00623 xoap::MessageReference returnMsg;
00624
00625 try {
00626 errorMsg = "Failed to extract FSM event and parameters from SOAP message: ";
00627 std::string command = soaputils::extractParameters(msg, this);
00628
00629 errorMsg = "Failed to put a '" + command + "' state machine event into command queue: ";
00630 if (command == "Configure")
00631 {
00632 sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::Configure() ) );
00633 }
00634 else if (command == "Enable")
00635 {
00636 if (sharedResources_->configuration_->streamConfigurationHasChanged())
00637 {
00638 sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::Reconfigure() ) );
00639 }
00640 sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::Enable() ) );
00641 }
00642 else if (command == "Stop")
00643 {
00644 sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::Stop() ) );
00645 }
00646 else if (command == "Halt")
00647 {
00648 sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::Halt() ) );
00649 }
00650 else if (command == "EmergencyStop")
00651 {
00652 sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::EmergencyStop() ) );
00653 }
00654 else
00655 {
00656 XCEPT_RAISE(stor::exception::StateMachine,
00657 "Received an unknown state machine event '" + command + "'.");
00658 }
00659
00660 errorMsg = "Failed to create FSM SOAP reply message: ";
00661 returnMsg = soaputils::createFsmSoapResponseMsg(command,
00662 sharedResources_->statisticsReporter_->
00663 getStateMachineMonitorCollection().externallyVisibleState());
00664 }
00665 catch (cms::Exception& e) {
00666 errorMsg += e.explainSelf();
00667 XCEPT_DECLARE(xoap::exception::Exception,
00668 sentinelException, errorMsg);
00669 sharedResources_->alarmHandler_->moveToFailedState( sentinelException );
00670 throw sentinelException;
00671 }
00672 catch (xcept::Exception &e) {
00673 XCEPT_DECLARE_NESTED(xoap::exception::Exception,
00674 sentinelException, errorMsg, e);
00675 sharedResources_->alarmHandler_->moveToFailedState( sentinelException );
00676 throw sentinelException;
00677 }
00678 catch (std::exception& e) {
00679 errorMsg += e.what();
00680 XCEPT_DECLARE(xoap::exception::Exception,
00681 sentinelException, errorMsg);
00682 sharedResources_->alarmHandler_->moveToFailedState( sentinelException );
00683 throw sentinelException;
00684 }
00685 catch (...) {
00686 errorMsg += "Unknown exception";
00687 XCEPT_DECLARE(xoap::exception::Exception,
00688 sentinelException, errorMsg);
00689 sharedResources_->alarmHandler_->moveToFailedState( sentinelException );
00690 throw sentinelException;
00691 }
00692
00693 return returnMsg;
00694 }
00695
00696
00700
00701 void
00702 StorageManager::processConsumerRegistrationRequest( xgi::Input* in, xgi::Output* out )
00703 throw( xgi::exception::Exception )
00704 {
00705 consumerUtils_->processConsumerRegistrationRequest(in,out);
00706 }
00707
00708
00709 void
00710 StorageManager::processConsumerHeaderRequest( xgi::Input* in, xgi::Output* out )
00711 throw( xgi::exception::Exception )
00712 {
00713 consumerUtils_->processConsumerHeaderRequest(in,out);
00714 }
00715
00716
00717 void
00718 StorageManager::processConsumerEventRequest( xgi::Input* in, xgi::Output* out )
00719 throw( xgi::exception::Exception )
00720 {
00721 consumerUtils_->processConsumerEventRequest(in,out);
00722 }
00723
00724
00725 void
00726 StorageManager::processDQMConsumerRegistrationRequest( xgi::Input* in, xgi::Output* out )
00727 throw( xgi::exception::Exception )
00728 {
00729 consumerUtils_->processDQMConsumerRegistrationRequest(in,out);
00730 }
00731
00732
00733 void
00734 StorageManager::processDQMConsumerEventRequest( xgi::Input* in, xgi::Output* out )
00735 throw( xgi::exception::Exception )
00736 {
00737 consumerUtils_->processDQMConsumerEventRequest(in,out);
00738 }
00739
00740 namespace stor {
00742
00744 template<>
00745 void
00746 ConsumerUtils<Configuration,EventQueueCollection>::
00747 writeConsumerEvent(xgi::Output* out, const I2OChain& evt) const
00748 {
00749 writeHTTPHeaders( out );
00750
00751 #ifdef STOR_DEBUG_CORRUPTED_EVENT_HEADER
00752 double r = rand()/static_cast<double>(RAND_MAX);
00753 if (r < 0.1)
00754 {
00755 std::cout << "Simulating corrupted event header" << std::endl;
00756 EventHeader* h = (EventHeader*)evt.dataLocation(0);
00757 h->protocolVersion_ = 1;
00758 }
00759 #endif // STOR_DEBUG_CORRUPTED_EVENT_HEADER
00760
00761 const unsigned int nfrags = evt.fragmentCount();
00762 for ( unsigned int i = 0; i < nfrags; ++i )
00763 {
00764 const unsigned long len = evt.dataSize( i );
00765 unsigned char* location = evt.dataLocation( i );
00766 out->write( (char*)location, len );
00767 }
00768 }
00769 }
00770
00771
00773
00775
00776 XDAQ_INSTANTIATE(StorageManager)
00777
00778
00779
00780
00781
00782
00783
00784
00785
00786
00787
00788
00789
00790