00001
00003
00004 #include "EventFilter/StorageManager/interface/DiskWriter.h"
00005 #include "EventFilter/StorageManager/interface/DiskWriterResources.h"
00006 #include "EventFilter/StorageManager/interface/DQMEventProcessor.h"
00007 #include "EventFilter/StorageManager/interface/DQMEventProcessorResources.h"
00008 #include "EventFilter/StorageManager/interface/DiscardManager.h"
00009 #include "EventFilter/StorageManager/interface/Exception.h"
00010 #include "EventFilter/StorageManager/interface/FragmentMonitorCollection.h"
00011 #include "EventFilter/StorageManager/interface/FragmentProcessor.h"
00012 #include "EventFilter/StorageManager/interface/SoapUtils.h"
00013 #include "EventFilter/StorageManager/interface/StorageManager.h"
00014 #include "EventFilter/StorageManager/interface/StateMachine.h"
00015 #include "EventFilter/StorageManager/src/ConsumerUtils.icc"
00016
00017 #include "EventFilter/Utilities/interface/i2oEvfMsgs.h"
00018
00019 #include "FWCore/Utilities/interface/EDMException.h"
00020
00021 #include "i2o/Method.h"
00022 #include "interface/shared/version.h"
00023 #include "interface/shared/i2oXFunctionCodes.h"
00024 #include "xcept/tools.h"
00025 #include "xdaq/NamespaceURI.h"
00026 #include "xdata/InfoSpaceFactory.h"
00027 #include "xgi/Method.h"
00028 #include "xoap/Method.h"
00029
00030 #include <boost/lexical_cast.hpp>
00031 #include <boost/shared_ptr.hpp>
00032
00033 #include <cstdlib>
00034
00035 using namespace std;
00036 using namespace stor;
00037
00038
00039 StorageManager::StorageManager(xdaq::ApplicationStub * s) :
00040 xdaq::Application(s)
00041 {
00042 LOG4CPLUS_INFO(this->getApplicationLogger(),"Making StorageManager");
00043
00044
00045 bindI2OCallbacks();
00046 bindStateMachineCallbacks();
00047 bindWebInterfaceCallbacks();
00048 bindConsumerCallbacks();
00049
00050 std::string errorMsg = "Exception in StorageManager constructor: ";
00051 try
00052 {
00053 initializeSharedResources();
00054 }
00055 catch(std::exception &e)
00056 {
00057 errorMsg += e.what();
00058 LOG4CPLUS_FATAL( getApplicationLogger(), e.what() );
00059 XCEPT_RAISE( stor::exception::Exception, e.what() );
00060 }
00061 catch(...)
00062 {
00063 errorMsg += "unknown exception";
00064 LOG4CPLUS_FATAL( getApplicationLogger(), errorMsg );
00065 XCEPT_RAISE( stor::exception::Exception, errorMsg );
00066 }
00067
00068 startWorkerThreads();
00069 }
00070
00071
00072 void StorageManager::bindI2OCallbacks()
00073 {
00074 i2o::bind(this,
00075 &StorageManager::receiveRegistryMessage,
00076 I2O_SM_PREAMBLE,
00077 XDAQ_ORGANIZATION_ID);
00078 i2o::bind(this,
00079 &StorageManager::receiveDataMessage,
00080 I2O_SM_DATA,
00081 XDAQ_ORGANIZATION_ID);
00082 i2o::bind(this,
00083 &StorageManager::receiveErrorDataMessage,
00084 I2O_SM_ERROR,
00085 XDAQ_ORGANIZATION_ID);
00086 i2o::bind(this,
00087 &StorageManager::receiveDQMMessage,
00088 I2O_SM_DQM,
00089 XDAQ_ORGANIZATION_ID);
00090 i2o::bind(this,
00091 &StorageManager::receiveEndOfLumiSectionMessage,
00092 I2O_EVM_LUMISECTION,
00093 XDAQ_ORGANIZATION_ID);
00094 }
00095
00096
00097 void StorageManager::bindStateMachineCallbacks()
00098 {
00099 xoap::bind( this,
00100 &StorageManager::handleFSMSoapMessage,
00101 "Configure",
00102 XDAQ_NS_URI );
00103 xoap::bind( this,
00104 &StorageManager::handleFSMSoapMessage,
00105 "Enable",
00106 XDAQ_NS_URI );
00107 xoap::bind( this,
00108 &StorageManager::handleFSMSoapMessage,
00109 "Stop",
00110 XDAQ_NS_URI );
00111 xoap::bind( this,
00112 &StorageManager::handleFSMSoapMessage,
00113 "Halt",
00114 XDAQ_NS_URI );
00115 xoap::bind( this,
00116 &StorageManager::handleFSMSoapMessage,
00117 "EmergencyStop",
00118 XDAQ_NS_URI );
00119 }
00120
00121
00122 void StorageManager::bindWebInterfaceCallbacks()
00123 {
00124 xgi::bind(this,&StorageManager::css, "styles.css");
00125 xgi::bind(this,&StorageManager::defaultWebPage, "Default");
00126 xgi::bind(this,&StorageManager::storedDataWebPage, "storedData");
00127 xgi::bind(this,&StorageManager::rbsenderWebPage, "rbsenderlist");
00128 xgi::bind(this,&StorageManager::rbsenderDetailWebPage, "rbsenderdetail");
00129 xgi::bind(this,&StorageManager::fileStatisticsWebPage, "fileStatistics");
00130 xgi::bind(this,&StorageManager::dqmEventStatisticsWebPage,"dqmEventStatistics");
00131 xgi::bind(this,&StorageManager::consumerStatisticsPage, "consumerStatistics" );
00132 xgi::bind(this,&StorageManager::consumerListWebPage, "consumerList");
00133 xgi::bind(this,&StorageManager::throughputWebPage, "throughputStatistics");
00134 }
00135
00136
00137 void StorageManager::bindConsumerCallbacks()
00138 {
00139
00140 xgi::bind( this, &StorageManager::processConsumerRegistrationRequest, "registerConsumer" );
00141 xgi::bind( this, &StorageManager::processConsumerHeaderRequest, "getregdata" );
00142 xgi::bind( this, &StorageManager::processConsumerEventRequest, "geteventdata" );
00143
00144
00145 xgi::bind(this,&StorageManager::processDQMConsumerRegistrationRequest, "registerDQMConsumer");
00146 xgi::bind(this,&StorageManager::processDQMConsumerEventRequest, "getDQMeventdata");
00147 }
00148
00149
00150 void StorageManager::initializeSharedResources()
00151 {
00152 sharedResources_.reset(new SharedResources());
00153
00154 xdata::InfoSpace *ispace = getApplicationInfoSpace();
00155 unsigned long instance = getApplicationDescriptor()->getInstance();
00156 sharedResources_->configuration_.reset(new Configuration(ispace, instance));
00157
00158 QueueConfigurationParams queueParams =
00159 sharedResources_->configuration_->getQueueConfigurationParams();
00160 sharedResources_->commandQueue_.
00161 reset(new CommandQueue(queueParams.commandQueueSize_));
00162 sharedResources_->fragmentQueue_.
00163 reset(new FragmentQueue(queueParams.fragmentQueueSize_, queueParams.fragmentQueueMemoryLimitMB_ * 1024*1024));
00164 sharedResources_->registrationQueue_.
00165 reset(new RegistrationQueue(queueParams.registrationQueueSize_));
00166 sharedResources_->streamQueue_.
00167 reset(new StreamQueue(queueParams.streamQueueSize_, queueParams.streamQueueMemoryLimitMB_ * 1024*1024));
00168 sharedResources_->dqmEventQueue_.
00169 reset(new DQMEventQueue(queueParams.dqmEventQueueSize_, queueParams.dqmEventQueueMemoryLimitMB_ * 1024*1024));
00170
00171 sharedResources_->alarmHandler_.reset( new AlarmHandler(this, sharedResources_) );
00172 sharedResources_->statisticsReporter_.reset(
00173 new StatisticsReporter(this, sharedResources_)
00174 );
00175 sharedResources_->initMsgCollection_.reset(new InitMsgCollection());
00176 sharedResources_->diskWriterResources_.reset(new DiskWriterResources());
00177 sharedResources_->dqmEventProcessorResources_.reset(new DQMEventProcessorResources());
00178
00179 sharedResources_->statisticsReporter_->getThroughputMonitorCollection().setFragmentQueue(sharedResources_->fragmentQueue_);
00180 sharedResources_->statisticsReporter_->getThroughputMonitorCollection().setStreamQueue(sharedResources_->streamQueue_);
00181 sharedResources_->statisticsReporter_->getThroughputMonitorCollection().setDQMEventQueue(sharedResources_->dqmEventQueue_);
00182
00183 sharedResources_->
00184 discardManager_.reset(new DiscardManager(getApplicationContext(),
00185 getApplicationDescriptor(),
00186 sharedResources_->statisticsReporter_->
00187 getDataSenderMonitorCollection()));
00188
00189 sharedResources_->registrationCollection_.reset( new RegistrationCollection() );
00190 EventConsumerMonitorCollection& ecmc =
00191 sharedResources_->statisticsReporter_->getEventConsumerMonitorCollection();
00192 sharedResources_->eventQueueCollection_.reset( new EventQueueCollection( ecmc ) );
00193
00194 DQMConsumerMonitorCollection& dcmc =
00195 sharedResources_->statisticsReporter_->getDQMConsumerMonitorCollection();
00196 sharedResources_->dqmEventQueueCollection_.reset( new DQMEventQueueCollection( dcmc ) );
00197
00198 consumerUtils_.reset( new ConsumerUtils_t(
00199 sharedResources_->configuration_,
00200 sharedResources_->registrationCollection_,
00201 sharedResources_->registrationQueue_,
00202 sharedResources_->initMsgCollection_,
00203 sharedResources_->eventQueueCollection_,
00204 sharedResources_->dqmEventQueueCollection_,
00205 sharedResources_->alarmHandler_
00206 ) );
00207
00208 smWebPageHelper_.reset( new SMWebPageHelper(
00209 getApplicationDescriptor(), sharedResources_));
00210
00211 }
00212
00213
00214 void StorageManager::startWorkerThreads()
00215 {
00216
00217
00218 try
00219 {
00220 fragmentProcessor_.reset( new FragmentProcessor( this, sharedResources_ ) );
00221 diskWriter_.reset( new DiskWriter(this, sharedResources_) );
00222 dqmEventProcessor_.reset( new DQMEventProcessor(this, sharedResources_) );
00223 sharedResources_->statisticsReporter_->startWorkLoop("theStatisticsReporter");
00224 fragmentProcessor_->startWorkLoop("theFragmentProcessor");
00225 diskWriter_->startWorkLoop("theDiskWriter");
00226 dqmEventProcessor_->startWorkLoop("theDQMEventProcessor");
00227 }
00228 catch(xcept::Exception &e)
00229 {
00230 sharedResources_->alarmHandler_->moveToFailedState( e );
00231 }
00232 catch(std::exception &e)
00233 {
00234 XCEPT_DECLARE(stor::exception::Exception,
00235 sentinelException, e.what());
00236 sharedResources_->alarmHandler_->moveToFailedState( sentinelException );
00237 }
00238 catch(...)
00239 {
00240 std::string errorMsg = "Unknown exception when starting the workloops";
00241 XCEPT_DECLARE(stor::exception::Exception,
00242 sentinelException, errorMsg);
00243 sharedResources_->alarmHandler_->moveToFailedState( sentinelException );
00244 }
00245 }
00246
00247
00249
00251
00252 void StorageManager::receiveRegistryMessage(toolbox::mem::Reference *ref)
00253 {
00254 I2OChain i2oChain(ref);
00255
00256
00257 ThroughputMonitorCollection& throughputMonCollection =
00258 sharedResources_->statisticsReporter_->getThroughputMonitorCollection();
00259 throughputMonCollection.setMemoryPoolPointer( ref->getBuffer()->getPool() );
00260
00261 FragmentMonitorCollection& fragMonCollection =
00262 sharedResources_->statisticsReporter_->getFragmentMonitorCollection();
00263 fragMonCollection.getAllFragmentSizeMQ().addSample(
00264 static_cast<double>( i2oChain.totalDataSize() ) / 0x100000
00265 );
00266
00267 sharedResources_->fragmentQueue_->enqWait(i2oChain);
00268 }
00269
00270
00271 void StorageManager::receiveDataMessage(toolbox::mem::Reference *ref)
00272 {
00273 I2OChain i2oChain(ref);
00274
00275 FragmentMonitorCollection& fragMonCollection =
00276 sharedResources_->statisticsReporter_->getFragmentMonitorCollection();
00277 fragMonCollection.addEventFragmentSample( i2oChain.totalDataSize() );
00278
00279 sharedResources_->fragmentQueue_->enqWait(i2oChain);
00280
00281 #ifdef STOR_DEBUG_DUPLICATE_MESSAGES
00282 double r = rand()/static_cast<double>(RAND_MAX);
00283 if (r < 0.001)
00284 {
00285 LOG4CPLUS_INFO(this->getApplicationLogger(), "Simulating duplicated data message");
00286 receiveDataMessage(ref->duplicate());
00287 }
00288 #endif
00289 }
00290
00291
00292 void StorageManager::receiveErrorDataMessage(toolbox::mem::Reference *ref)
00293 {
00294 I2OChain i2oChain(ref);
00295
00296 FragmentMonitorCollection& fragMonCollection =
00297 sharedResources_->statisticsReporter_->getFragmentMonitorCollection();
00298 fragMonCollection.addEventFragmentSample( i2oChain.totalDataSize() );
00299
00300 sharedResources_->fragmentQueue_->enqWait(i2oChain);
00301 }
00302
00303
00304 void StorageManager::receiveDQMMessage(toolbox::mem::Reference *ref)
00305 {
00306 I2OChain i2oChain(ref);
00307
00308 FragmentMonitorCollection& fragMonCollection =
00309 sharedResources_->statisticsReporter_->getFragmentMonitorCollection();
00310 fragMonCollection.addDQMEventFragmentSample( i2oChain.totalDataSize() );
00311
00312 sharedResources_->fragmentQueue_->enqWait(i2oChain);
00313 }
00314
00315
00316 void StorageManager::receiveEndOfLumiSectionMessage(toolbox::mem::Reference *ref)
00317 {
00318 I2OChain i2oChain( ref );
00319
00320 FragmentMonitorCollection& fragMonCollection =
00321 sharedResources_->statisticsReporter_->getFragmentMonitorCollection();
00322 fragMonCollection.addFragmentSample( i2oChain.totalDataSize() );
00323
00324 RunMonitorCollection& runMonCollection =
00325 sharedResources_->statisticsReporter_->getRunMonitorCollection();
00326 runMonCollection.getEoLSSeenMQ().addSample( i2oChain.lumiSection() );
00327
00328 sharedResources_->streamQueue_->enqWait( i2oChain );
00329 }
00330
00331
00333
00335
00336 void StorageManager::css(xgi::Input *in, xgi::Output *out)
00337 throw (xgi::exception::Exception)
00338 {
00339 smWebPageHelper_->css(in,out);
00340 }
00341
00342
00343 void StorageManager::defaultWebPage(xgi::Input *in, xgi::Output *out)
00344 throw (xgi::exception::Exception)
00345 {
00346 std::string errorMsg = "Failed to create the default webpage";
00347
00348 try
00349 {
00350 smWebPageHelper_->defaultWebPage(out);
00351 }
00352 catch(std::exception &e)
00353 {
00354 errorMsg += ": ";
00355 errorMsg += e.what();
00356
00357 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00358 XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00359 }
00360 catch(...)
00361 {
00362 errorMsg += ": Unknown exception";
00363
00364 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00365 XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00366 }
00367 }
00368
00369
00370 void StorageManager::storedDataWebPage(xgi::Input *in, xgi::Output *out)
00371 throw (xgi::exception::Exception)
00372 {
00373 std::string errorMsg = "Failed to create the stored data webpage";
00374
00375 try
00376 {
00377 smWebPageHelper_->storedDataWebPage(out);
00378 }
00379 catch(std::exception &e)
00380 {
00381 errorMsg += ": ";
00382 errorMsg += e.what();
00383
00384 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00385 XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00386 }
00387 catch(...)
00388 {
00389 errorMsg += ": Unknown exception";
00390
00391 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00392 XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00393 }
00394 }
00395
00396
00397 void StorageManager::consumerStatisticsPage( xgi::Input* in,
00398 xgi::Output* out )
00399 throw( xgi::exception::Exception )
00400 {
00401
00402 std::string err_msg =
00403 "Failed to create consumer statistics page";
00404
00405 try
00406 {
00407 smWebPageHelper_->consumerStatistics(out);
00408 }
00409 catch( std::exception &e )
00410 {
00411 err_msg += ": ";
00412 err_msg += e.what();
00413 LOG4CPLUS_ERROR( getApplicationLogger(), err_msg );
00414 XCEPT_RAISE( xgi::exception::Exception, err_msg );
00415 }
00416 catch(...)
00417 {
00418 err_msg += ": Unknown exception";
00419 LOG4CPLUS_ERROR( getApplicationLogger(), err_msg );
00420 XCEPT_RAISE( xgi::exception::Exception, err_msg );
00421 }
00422
00423 }
00424
00425
00426 void StorageManager::rbsenderWebPage(xgi::Input *in, xgi::Output *out)
00427 throw (xgi::exception::Exception)
00428 {
00429 std::string errorMsg = "Failed to create the data sender webpage";
00430
00431 try
00432 {
00433 smWebPageHelper_->resourceBrokerOverview(out);
00434 }
00435 catch(std::exception &e)
00436 {
00437 errorMsg += ": ";
00438 errorMsg += e.what();
00439
00440 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00441 XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00442 }
00443 catch(...)
00444 {
00445 errorMsg += ": Unknown exception";
00446
00447 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00448 XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00449 }
00450
00451 }
00452
00453
00454 void StorageManager::rbsenderDetailWebPage(xgi::Input *in, xgi::Output *out)
00455 throw (xgi::exception::Exception)
00456 {
00457 std::string errorMsg = "Failed to create the data sender webpage";
00458
00459 try
00460 {
00461 long long localRBID = 0;
00462 cgicc::Cgicc cgiWrapper(in);
00463 cgicc::const_form_iterator updateRef = cgiWrapper.getElement("id");
00464 if (updateRef != cgiWrapper.getElements().end())
00465 {
00466 std::string idString = updateRef->getValue();
00467 localRBID = boost::lexical_cast<long long>(idString);
00468 }
00469
00470 smWebPageHelper_->resourceBrokerDetail(out, localRBID);
00471 }
00472 catch(std::exception &e)
00473 {
00474 errorMsg += ": ";
00475 errorMsg += e.what();
00476
00477 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00478 XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00479 }
00480 catch(...)
00481 {
00482 errorMsg += ": Unknown exception";
00483
00484 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00485 XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00486 }
00487
00488 }
00489
00490
00491 void StorageManager::fileStatisticsWebPage(xgi::Input *in, xgi::Output *out)
00492 throw (xgi::exception::Exception)
00493 {
00494 std::string errorMsg = "Failed to create the file statistics webpage";
00495
00496 try
00497 {
00498 smWebPageHelper_->filesWebPage(out);
00499 }
00500 catch(std::exception &e)
00501 {
00502 errorMsg += ": ";
00503 errorMsg += e.what();
00504
00505 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00506 XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00507 }
00508 catch(...)
00509 {
00510 errorMsg += ": Unknown exception";
00511
00512 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00513 XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00514 }
00515
00516 }
00517
00518
00519 void StorageManager::dqmEventStatisticsWebPage(xgi::Input *in, xgi::Output *out)
00520 throw (xgi::exception::Exception)
00521 {
00522 std::string errorMsg = "Failed to create the DQM event statistics webpage";
00523
00524 try
00525 {
00526 smWebPageHelper_->dqmEventWebPage(out);
00527 }
00528 catch(std::exception &e)
00529 {
00530 errorMsg += ": ";
00531 errorMsg += e.what();
00532
00533 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00534 XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00535 }
00536 catch(...)
00537 {
00538 errorMsg += ": Unknown exception";
00539
00540 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00541 XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00542 }
00543 }
00544
00545
00546 void StorageManager::throughputWebPage(xgi::Input *in, xgi::Output *out)
00547 throw (xgi::exception::Exception)
00548 {
00549 std::string errorMsg = "Failed to create the throughput statistics webpage";
00550
00551 try
00552 {
00553 smWebPageHelper_->throughputWebPage(out);
00554 }
00555 catch(std::exception &e)
00556 {
00557 errorMsg += ": ";
00558 errorMsg += e.what();
00559
00560 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00561 XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00562 }
00563 catch(...)
00564 {
00565 errorMsg += ": Unknown exception";
00566
00567 LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00568 XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00569 }
00570
00571 }
00572
00573
00574
00575 void StorageManager::consumerListWebPage(xgi::Input *in, xgi::Output *out)
00576 throw (xgi::exception::Exception)
00577 {
00578 out->getHTTPResponseHeader().addHeader( "Content-Type",
00579 "application/octet-stream" );
00580 out->getHTTPResponseHeader().addHeader( "Content-Transfer-Encoding",
00581 "binary" );
00582 char buff;
00583 out->write( &buff, 0 );
00584 }
00585
00586
00588
00590
00591 xoap::MessageReference StorageManager::handleFSMSoapMessage( xoap::MessageReference msg )
00592 throw( xoap::exception::Exception )
00593 {
00594 std::string errorMsg;
00595 xoap::MessageReference returnMsg;
00596
00597 try {
00598 errorMsg = "Failed to extract FSM event and parameters from SOAP message: ";
00599 std::string command = soaputils::extractParameters(msg, this);
00600
00601 errorMsg = "Failed to put a '" + command + "' state machine event into command queue: ";
00602 if (command == "Configure")
00603 {
00604 sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::Configure() ) );
00605 }
00606 else if (command == "Enable")
00607 {
00608 if (sharedResources_->configuration_->streamConfigurationHasChanged())
00609 {
00610 sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::Reconfigure() ) );
00611 }
00612 sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::Enable() ) );
00613 }
00614 else if (command == "Stop")
00615 {
00616 sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::Stop() ) );
00617 }
00618 else if (command == "Halt")
00619 {
00620 sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::Halt() ) );
00621 }
00622 else if (command == "EmergencyStop")
00623 {
00624 sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::EmergencyStop() ) );
00625 }
00626 else
00627 {
00628 XCEPT_RAISE(stor::exception::StateMachine,
00629 "Received an unknown state machine event '" + command + "'.");
00630 }
00631
00632 errorMsg = "Failed to create FSM SOAP reply message: ";
00633 returnMsg = soaputils::createFsmSoapResponseMsg(command,
00634 sharedResources_->statisticsReporter_->
00635 getStateMachineMonitorCollection().externallyVisibleState());
00636 }
00637 catch (cms::Exception& e) {
00638 errorMsg += e.explainSelf();
00639 XCEPT_DECLARE(xoap::exception::Exception,
00640 sentinelException, errorMsg);
00641 sharedResources_->alarmHandler_->moveToFailedState( sentinelException );
00642 throw sentinelException;
00643 }
00644 catch (xcept::Exception &e) {
00645 XCEPT_DECLARE_NESTED(xoap::exception::Exception,
00646 sentinelException, errorMsg, e);
00647 sharedResources_->alarmHandler_->moveToFailedState( sentinelException );
00648 throw sentinelException;
00649 }
00650 catch (std::exception& e) {
00651 errorMsg += e.what();
00652 XCEPT_DECLARE(xoap::exception::Exception,
00653 sentinelException, errorMsg);
00654 sharedResources_->alarmHandler_->moveToFailedState( sentinelException );
00655 throw sentinelException;
00656 }
00657 catch (...) {
00658 errorMsg += "Unknown exception";
00659 XCEPT_DECLARE(xoap::exception::Exception,
00660 sentinelException, errorMsg);
00661 sharedResources_->alarmHandler_->moveToFailedState( sentinelException );
00662 throw sentinelException;
00663 }
00664
00665 return returnMsg;
00666 }
00667
00668
00672
00673 void
00674 StorageManager::processConsumerRegistrationRequest( xgi::Input* in, xgi::Output* out )
00675 throw( xgi::exception::Exception )
00676 {
00677 consumerUtils_->processConsumerRegistrationRequest(in,out);
00678 }
00679
00680
00681 void
00682 StorageManager::processConsumerHeaderRequest( xgi::Input* in, xgi::Output* out )
00683 throw( xgi::exception::Exception )
00684 {
00685 consumerUtils_->processConsumerHeaderRequest(in,out);
00686 }
00687
00688
00689 void
00690 StorageManager::processConsumerEventRequest( xgi::Input* in, xgi::Output* out )
00691 throw( xgi::exception::Exception )
00692 {
00693 consumerUtils_->processConsumerEventRequest(in,out);
00694 }
00695
00696
00697 void
00698 StorageManager::processDQMConsumerRegistrationRequest( xgi::Input* in, xgi::Output* out )
00699 throw( xgi::exception::Exception )
00700 {
00701 consumerUtils_->processDQMConsumerRegistrationRequest(in,out);
00702 }
00703
00704
00705 void
00706 StorageManager::processDQMConsumerEventRequest( xgi::Input* in, xgi::Output* out )
00707 throw( xgi::exception::Exception )
00708 {
00709 consumerUtils_->processDQMConsumerEventRequest(in,out);
00710 }
00711
00712 namespace stor {
00714
00716 template<>
00717 void
00718 ConsumerUtils<Configuration,EventQueueCollection>::
00719 writeConsumerEvent(xgi::Output* out, const I2OChain& evt) const
00720 {
00721 writeHTTPHeaders( out );
00722
00723 #ifdef STOR_DEBUG_CORRUPTED_EVENT_HEADER
00724 double r = rand()/static_cast<double>(RAND_MAX);
00725 if (r < 0.1)
00726 {
00727 std::cout << "Simulating corrupted event header" << std::endl;
00728 EventHeader* h = (EventHeader*)evt.dataLocation(0);
00729 h->protocolVersion_ = 1;
00730 }
00731 #endif // STOR_DEBUG_CORRUPTED_EVENT_HEADER
00732
00733 const unsigned int nfrags = evt.fragmentCount();
00734 for ( unsigned int i = 0; i < nfrags; ++i )
00735 {
00736 const unsigned long len = evt.dataSize( i );
00737 unsigned char* location = evt.dataLocation( i );
00738 out->write( (char*)location, len );
00739 }
00740 }
00741 }
00742
00743
00745
00747
00748 XDAQ_INSTANTIATE(StorageManager)
00749
00750
00751
00752
00753
00754
00755
00756
00757
00758
00759
00760
00761
00762