CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_4_4_5_patch3/src/EventFilter/StorageManager/src/StorageManager.cc

Go to the documentation of this file.
00001 // $Id: StorageManager.cc,v 1.137 2011/11/08 10:48:41 mommsen Exp $
00003 
00004 #include "EventFilter/StorageManager/interface/DiskWriter.h"
00005 #include "EventFilter/StorageManager/interface/DiskWriterResources.h"
00006 #include "EventFilter/StorageManager/interface/DQMEventProcessor.h"
00007 #include "EventFilter/StorageManager/interface/DQMEventProcessorResources.h"
00008 #include "EventFilter/StorageManager/interface/DiscardManager.h"
00009 #include "EventFilter/StorageManager/interface/Exception.h"
00010 #include "EventFilter/StorageManager/interface/FragmentMonitorCollection.h"
00011 #include "EventFilter/StorageManager/interface/FragmentProcessor.h"
00012 #include "EventFilter/StorageManager/interface/SoapUtils.h"
00013 #include "EventFilter/StorageManager/interface/StorageManager.h"
00014 #include "EventFilter/StorageManager/interface/StateMachine.h"
00015 #include "EventFilter/StorageManager/src/ConsumerUtils.icc"
00016 
00017 #include "EventFilter/Utilities/interface/i2oEvfMsgs.h"
00018 
00019 #include "FWCore/Utilities/interface/EDMException.h"
00020 
00021 #include "i2o/Method.h"
00022 #include "interface/shared/version.h"
00023 #include "interface/shared/i2oXFunctionCodes.h"
00024 #include "xcept/tools.h"
00025 #include "xdaq/NamespaceURI.h"
00026 #include "xdata/InfoSpaceFactory.h"
00027 #include "xgi/Method.h"
00028 #include "xoap/Method.h"
00029 
00030 #include <boost/lexical_cast.hpp>
00031 #include <boost/shared_ptr.hpp>
00032 
00033 #include <cstdlib>
00034 
00035 using namespace std;
00036 using namespace stor;
00037 
00038 
00039 StorageManager::StorageManager(xdaq::ApplicationStub * s) :
00040   xdaq::Application(s)
00041 {  
00042   LOG4CPLUS_INFO(this->getApplicationLogger(),"Making StorageManager");
00043 
00044   // bind all callback functions
00045   bindI2OCallbacks();
00046   bindStateMachineCallbacks();
00047   bindWebInterfaceCallbacks();
00048   bindConsumerCallbacks();
00049 
00050   std::string errorMsg = "Exception in StorageManager constructor: ";
00051   try
00052   {
00053     initializeSharedResources();
00054   }
00055   catch(std::exception &e)
00056   {
00057     errorMsg += e.what();
00058     LOG4CPLUS_FATAL( getApplicationLogger(), e.what() );
00059     XCEPT_RAISE( stor::exception::Exception, e.what() );
00060   }
00061   catch(...)
00062   {
00063     errorMsg += "unknown exception";
00064     LOG4CPLUS_FATAL( getApplicationLogger(), errorMsg );
00065     XCEPT_RAISE( stor::exception::Exception, errorMsg );
00066   }
00067 
00068   startWorkerThreads();
00069 }
00070 
00071 
00072 void StorageManager::bindI2OCallbacks()
00073 {
00074   i2o::bind(this,
00075             &StorageManager::receiveRegistryMessage,
00076             I2O_SM_PREAMBLE,
00077             XDAQ_ORGANIZATION_ID);
00078   i2o::bind(this,
00079             &StorageManager::receiveDataMessage,
00080             I2O_SM_DATA,
00081             XDAQ_ORGANIZATION_ID);
00082   i2o::bind(this,
00083             &StorageManager::receiveErrorDataMessage,
00084             I2O_SM_ERROR,
00085             XDAQ_ORGANIZATION_ID);
00086   i2o::bind(this,
00087             &StorageManager::receiveDQMMessage,
00088             I2O_SM_DQM,
00089             XDAQ_ORGANIZATION_ID);
00090   i2o::bind(this,
00091             &StorageManager::receiveEndOfLumiSectionMessage,
00092             I2O_EVM_LUMISECTION,
00093             XDAQ_ORGANIZATION_ID);
00094 }
00095 
00096 
00097 void StorageManager::bindStateMachineCallbacks()
00098 {
00099   xoap::bind( this,
00100               &StorageManager::handleFSMSoapMessage,
00101               "Configure",
00102               XDAQ_NS_URI );
00103   xoap::bind( this,
00104               &StorageManager::handleFSMSoapMessage,
00105               "Enable",
00106               XDAQ_NS_URI );
00107   xoap::bind( this,
00108               &StorageManager::handleFSMSoapMessage,
00109               "Stop",
00110               XDAQ_NS_URI );
00111   xoap::bind( this,
00112               &StorageManager::handleFSMSoapMessage,
00113               "Halt",
00114               XDAQ_NS_URI );
00115   xoap::bind( this,
00116               &StorageManager::handleFSMSoapMessage,
00117               "EmergencyStop",
00118               XDAQ_NS_URI );
00119 }
00120 
00121 
00122 void StorageManager::bindWebInterfaceCallbacks()
00123 {
00124   xgi::bind(this,&StorageManager::css,                      "styles.css");
00125   xgi::bind(this,&StorageManager::defaultWebPage,           "Default");
00126   xgi::bind(this,&StorageManager::storedDataWebPage,        "storedData");
00127   xgi::bind(this,&StorageManager::rbsenderWebPage,          "rbsenderlist");
00128   xgi::bind(this,&StorageManager::rbsenderDetailWebPage,    "rbsenderdetail");
00129   xgi::bind(this,&StorageManager::fileStatisticsWebPage,    "fileStatistics");
00130   xgi::bind(this,&StorageManager::dqmEventStatisticsWebPage,"dqmEventStatistics");
00131   xgi::bind(this,&StorageManager::consumerStatisticsPage,   "consumerStatistics" );
00132   xgi::bind(this,&StorageManager::consumerListWebPage,      "consumerList");
00133   xgi::bind(this,&StorageManager::throughputWebPage,        "throughputStatistics");
00134 }
00135 
00136 
00137 void StorageManager::bindConsumerCallbacks()
00138 {
00139   // event consumers
00140   xgi::bind( this, &StorageManager::processConsumerRegistrationRequest, "registerConsumer" );
00141   xgi::bind( this, &StorageManager::processConsumerHeaderRequest, "getregdata" );
00142   xgi::bind( this, &StorageManager::processConsumerEventRequest, "geteventdata" );
00143 
00144   // dqm event consumers
00145   xgi::bind(this,&StorageManager::processDQMConsumerRegistrationRequest, "registerDQMConsumer");
00146   xgi::bind(this,&StorageManager::processDQMConsumerEventRequest, "getDQMeventdata");
00147 }
00148 
00149 
00150 void StorageManager::initializeSharedResources()
00151 {
00152   sharedResources_.reset(new SharedResources());
00153 
00154   xdata::InfoSpace *ispace = getApplicationInfoSpace();
00155   unsigned long instance = getApplicationDescriptor()->getInstance();
00156   sharedResources_->configuration_.reset(new Configuration(ispace, instance));
00157 
00158   QueueConfigurationParams queueParams =
00159     sharedResources_->configuration_->getQueueConfigurationParams();
00160   sharedResources_->commandQueue_.
00161     reset(new CommandQueue(queueParams.commandQueueSize_));
00162   sharedResources_->fragmentQueue_.
00163     reset(new FragmentQueue(queueParams.fragmentQueueSize_, queueParams.fragmentQueueMemoryLimitMB_ * 1024*1024));
00164   sharedResources_->registrationQueue_.
00165     reset(new RegistrationQueue(queueParams.registrationQueueSize_));
00166   sharedResources_->streamQueue_.
00167     reset(new StreamQueue(queueParams.streamQueueSize_, queueParams.streamQueueMemoryLimitMB_ * 1024*1024));
00168   sharedResources_->dqmEventQueue_.
00169     reset(new DQMEventQueue(queueParams.dqmEventQueueSize_, queueParams.dqmEventQueueMemoryLimitMB_ * 1024*1024));
00170 
00171   sharedResources_->alarmHandler_.reset( new AlarmHandler(this, sharedResources_) );
00172   sharedResources_->statisticsReporter_.reset(
00173     new StatisticsReporter(this, sharedResources_)
00174   );
00175   sharedResources_->initMsgCollection_.reset(new InitMsgCollection());
00176   sharedResources_->diskWriterResources_.reset(new DiskWriterResources());
00177   sharedResources_->dqmEventProcessorResources_.reset(new DQMEventProcessorResources());
00178 
00179   sharedResources_->statisticsReporter_->getThroughputMonitorCollection().setFragmentQueue(sharedResources_->fragmentQueue_);
00180   sharedResources_->statisticsReporter_->getThroughputMonitorCollection().setStreamQueue(sharedResources_->streamQueue_);
00181   sharedResources_->statisticsReporter_->getThroughputMonitorCollection().setDQMEventQueue(sharedResources_->dqmEventQueue_);
00182 
00183   sharedResources_->
00184     discardManager_.reset(new DiscardManager(getApplicationContext(),
00185                                              getApplicationDescriptor(),
00186                                              sharedResources_->statisticsReporter_->
00187                                              getDataSenderMonitorCollection()));
00188 
00189   sharedResources_->registrationCollection_.reset( new RegistrationCollection() );
00190   EventConsumerMonitorCollection& ecmc = 
00191     sharedResources_->statisticsReporter_->getEventConsumerMonitorCollection();
00192   sharedResources_->eventQueueCollection_.reset( new EventQueueCollection( ecmc ) );
00193 
00194   DQMConsumerMonitorCollection& dcmc = 
00195     sharedResources_->statisticsReporter_->getDQMConsumerMonitorCollection();
00196   sharedResources_->dqmEventQueueCollection_.reset( new DQMEventQueueCollection( dcmc ) );
00197 
00198   consumerUtils_.reset( new ConsumerUtils_t(
00199       sharedResources_->configuration_,
00200       sharedResources_->registrationCollection_,
00201       sharedResources_->registrationQueue_,
00202       sharedResources_->initMsgCollection_,
00203       sharedResources_->eventQueueCollection_,
00204       sharedResources_->dqmEventQueueCollection_,
00205       sharedResources_->alarmHandler_
00206     ) );
00207 
00208   smWebPageHelper_.reset( new SMWebPageHelper(
00209       getApplicationDescriptor(), sharedResources_));
00210 
00211 }
00212 
00213 
00214 void StorageManager::startWorkerThreads()
00215 {
00216 
00217   // Start the workloops
00218   try
00219   {
00220     fragmentProcessor_.reset( new FragmentProcessor( this, sharedResources_ ) );
00221     diskWriter_.reset( new DiskWriter(this, sharedResources_) );
00222     dqmEventProcessor_.reset( new DQMEventProcessor(this, sharedResources_) );
00223     sharedResources_->statisticsReporter_->startWorkLoop("theStatisticsReporter");
00224     fragmentProcessor_->startWorkLoop("theFragmentProcessor");
00225     diskWriter_->startWorkLoop("theDiskWriter");
00226     dqmEventProcessor_->startWorkLoop("theDQMEventProcessor");
00227   }
00228   catch(xcept::Exception &e)
00229   {
00230     sharedResources_->alarmHandler_->moveToFailedState( e );
00231   }
00232   catch(std::exception &e)
00233   {
00234     XCEPT_DECLARE(stor::exception::Exception,
00235       sentinelException, e.what());
00236     sharedResources_->alarmHandler_->moveToFailedState( sentinelException );
00237   }
00238   catch(...)
00239   {
00240     std::string errorMsg = "Unknown exception when starting the workloops";
00241     XCEPT_DECLARE(stor::exception::Exception,
00242       sentinelException, errorMsg);
00243     sharedResources_->alarmHandler_->moveToFailedState( sentinelException );
00244   }
00245 }
00246 
00247 
00249 // I2O call back functions //
00251 
00252 void StorageManager::receiveRegistryMessage(toolbox::mem::Reference *ref)
00253 {
00254   I2OChain i2oChain(ref);
00255 
00256   // Set the I2O message pool pointer. Only done for init messages.
00257   ThroughputMonitorCollection& throughputMonCollection =
00258     sharedResources_->statisticsReporter_->getThroughputMonitorCollection();
00259   throughputMonCollection.setMemoryPoolPointer( ref->getBuffer()->getPool() );
00260 
00261   FragmentMonitorCollection& fragMonCollection =
00262     sharedResources_->statisticsReporter_->getFragmentMonitorCollection();
00263   fragMonCollection.getAllFragmentSizeMQ().addSample( 
00264     static_cast<double>( i2oChain.totalDataSize() ) / 0x100000
00265   );
00266 
00267   sharedResources_->fragmentQueue_->enqWait(i2oChain);
00268 }
00269 
00270 
00271 void StorageManager::receiveDataMessage(toolbox::mem::Reference *ref)
00272 {
00273   I2OChain i2oChain(ref);
00274 
00275   FragmentMonitorCollection& fragMonCollection =
00276     sharedResources_->statisticsReporter_->getFragmentMonitorCollection();
00277   fragMonCollection.addEventFragmentSample( i2oChain.totalDataSize() );
00278 
00279   sharedResources_->fragmentQueue_->enqWait(i2oChain);
00280 
00281 #ifdef STOR_DEBUG_DUPLICATE_MESSAGES
00282   double r = rand()/static_cast<double>(RAND_MAX);
00283   if (r < 0.001)
00284   {
00285     LOG4CPLUS_INFO(this->getApplicationLogger(), "Simulating duplicated data message");
00286     receiveDataMessage(ref->duplicate());
00287   }
00288 #endif
00289 }
00290 
00291 
00292 void StorageManager::receiveErrorDataMessage(toolbox::mem::Reference *ref)
00293 {
00294   I2OChain i2oChain(ref);
00295 
00296   FragmentMonitorCollection& fragMonCollection =
00297     sharedResources_->statisticsReporter_->getFragmentMonitorCollection();
00298   fragMonCollection.addEventFragmentSample( i2oChain.totalDataSize() );
00299 
00300   sharedResources_->fragmentQueue_->enqWait(i2oChain);
00301 }
00302 
00303 
00304 void StorageManager::receiveDQMMessage(toolbox::mem::Reference *ref)
00305 {
00306   I2OChain i2oChain(ref);
00307 
00308   FragmentMonitorCollection& fragMonCollection =
00309     sharedResources_->statisticsReporter_->getFragmentMonitorCollection();
00310   fragMonCollection.addDQMEventFragmentSample( i2oChain.totalDataSize() );
00311 
00312   sharedResources_->fragmentQueue_->enqWait(i2oChain);
00313 }
00314 
00315 
00316 void StorageManager::receiveEndOfLumiSectionMessage(toolbox::mem::Reference *ref)
00317 {
00318   I2OChain i2oChain( ref );
00319 
00320   FragmentMonitorCollection& fragMonCollection =
00321     sharedResources_->statisticsReporter_->getFragmentMonitorCollection();
00322   fragMonCollection.addFragmentSample( i2oChain.totalDataSize() );
00323 
00324   RunMonitorCollection& runMonCollection =
00325     sharedResources_->statisticsReporter_->getRunMonitorCollection();
00326   runMonCollection.getEoLSSeenMQ().addSample( i2oChain.lumiSection() );
00327 
00328   sharedResources_->streamQueue_->enqWait( i2oChain );
00329 }
00330 
00331 
00333 // Web interface call back functions //
00335 
00336 void StorageManager::css(xgi::Input *in, xgi::Output *out)
00337 throw (xgi::exception::Exception)
00338 {
00339   smWebPageHelper_->css(in,out);
00340 }
00341 
00342 
00343 void StorageManager::defaultWebPage(xgi::Input *in, xgi::Output *out)
00344 throw (xgi::exception::Exception)
00345 {
00346   std::string errorMsg = "Failed to create the default webpage";
00347   
00348   try
00349   {
00350     smWebPageHelper_->defaultWebPage(out);
00351   }
00352   catch(std::exception &e)
00353   {
00354     errorMsg += ": ";
00355     errorMsg += e.what();
00356     
00357     LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00358     XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00359   }
00360   catch(...)
00361   {
00362     errorMsg += ": Unknown exception";
00363     
00364     LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00365     XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00366   }
00367 }
00368 
00369 
00370 void StorageManager::storedDataWebPage(xgi::Input *in, xgi::Output *out)
00371   throw (xgi::exception::Exception)
00372 {
00373   std::string errorMsg = "Failed to create the stored data webpage";
00374 
00375   try
00376   {
00377     smWebPageHelper_->storedDataWebPage(out);
00378   }
00379   catch(std::exception &e)
00380   {
00381     errorMsg += ": ";
00382     errorMsg += e.what();
00383     
00384     LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00385     XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00386   }
00387   catch(...)
00388   {
00389     errorMsg += ": Unknown exception";
00390     
00391     LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00392     XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00393   }
00394 }
00395 
00396 
00397 void StorageManager::consumerStatisticsPage( xgi::Input* in,
00398                                              xgi::Output* out )
00399   throw( xgi::exception::Exception )
00400 {
00401 
00402   std::string err_msg =
00403     "Failed to create consumer statistics page";
00404 
00405   try
00406   {
00407     smWebPageHelper_->consumerStatistics(out);
00408   }
00409   catch( std::exception &e )
00410   {
00411     err_msg += ": ";
00412     err_msg += e.what();
00413     LOG4CPLUS_ERROR( getApplicationLogger(), err_msg );
00414     XCEPT_RAISE( xgi::exception::Exception, err_msg );
00415   }
00416   catch(...)
00417   {
00418     err_msg += ": Unknown exception";
00419     LOG4CPLUS_ERROR( getApplicationLogger(), err_msg );
00420     XCEPT_RAISE( xgi::exception::Exception, err_msg );
00421   }
00422 
00423 }
00424 
00425 
00426 void StorageManager::rbsenderWebPage(xgi::Input *in, xgi::Output *out)
00427   throw (xgi::exception::Exception)
00428 {
00429   std::string errorMsg = "Failed to create the data sender webpage";
00430 
00431   try
00432   {
00433     smWebPageHelper_->resourceBrokerOverview(out);
00434   }
00435   catch(std::exception &e)
00436   {
00437     errorMsg += ": ";
00438     errorMsg += e.what();
00439     
00440     LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00441     XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00442   }
00443   catch(...)
00444   {
00445     errorMsg += ": Unknown exception";
00446     
00447     LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00448     XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00449   }
00450 
00451 }
00452 
00453 
00454 void StorageManager::rbsenderDetailWebPage(xgi::Input *in, xgi::Output *out)
00455   throw (xgi::exception::Exception)
00456 {
00457   std::string errorMsg = "Failed to create the data sender webpage";
00458 
00459   try
00460   {
00461     long long localRBID = 0;
00462     cgicc::Cgicc cgiWrapper(in);
00463     cgicc::const_form_iterator updateRef = cgiWrapper.getElement("id");
00464     if (updateRef != cgiWrapper.getElements().end())
00465     {
00466       std::string idString = updateRef->getValue();
00467       localRBID = boost::lexical_cast<long long>(idString);
00468     }
00469 
00470     smWebPageHelper_->resourceBrokerDetail(out, localRBID);
00471   }
00472   catch(std::exception &e)
00473   {
00474     errorMsg += ": ";
00475     errorMsg += e.what();
00476     
00477     LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00478     XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00479   }
00480   catch(...)
00481   {
00482     errorMsg += ": Unknown exception";
00483     
00484     LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00485     XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00486   }
00487 
00488 }
00489 
00490 
00491 void StorageManager::fileStatisticsWebPage(xgi::Input *in, xgi::Output *out)
00492   throw (xgi::exception::Exception)
00493 {
00494   std::string errorMsg = "Failed to create the file statistics webpage";
00495 
00496   try
00497   {
00498     smWebPageHelper_->filesWebPage(out);
00499   }
00500   catch(std::exception &e)
00501   {
00502     errorMsg += ": ";
00503     errorMsg += e.what();
00504     
00505     LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00506     XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00507   }
00508   catch(...)
00509   {
00510     errorMsg += ": Unknown exception";
00511     
00512     LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00513     XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00514   }
00515 
00516 }
00517 
00518 
00519 void StorageManager::dqmEventStatisticsWebPage(xgi::Input *in, xgi::Output *out)
00520   throw (xgi::exception::Exception)
00521 {
00522   std::string errorMsg = "Failed to create the DQM event statistics webpage";
00523 
00524   try
00525   {
00526     smWebPageHelper_->dqmEventWebPage(out);
00527   }
00528   catch(std::exception &e)
00529   {
00530     errorMsg += ": ";
00531     errorMsg += e.what();
00532     
00533     LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00534     XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00535   }
00536   catch(...)
00537   {
00538     errorMsg += ": Unknown exception";
00539     
00540     LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00541     XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00542   }
00543 }
00544 
00545 
00546 void StorageManager::throughputWebPage(xgi::Input *in, xgi::Output *out)
00547   throw (xgi::exception::Exception)
00548 {
00549   std::string errorMsg = "Failed to create the throughput statistics webpage";
00550 
00551   try
00552   {
00553     smWebPageHelper_->throughputWebPage(out);
00554   }
00555   catch(std::exception &e)
00556   {
00557     errorMsg += ": ";
00558     errorMsg += e.what();
00559     
00560     LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00561     XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00562   }
00563   catch(...)
00564   {
00565     errorMsg += ": Unknown exception";
00566     
00567     LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00568     XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00569   }
00570 
00571 }
00572 
00573 
00574 // Leaving in for now but making it return an empty buffer:
00575 void StorageManager::consumerListWebPage(xgi::Input *in, xgi::Output *out)
00576   throw (xgi::exception::Exception)
00577 {
00578   out->getHTTPResponseHeader().addHeader( "Content-Type",
00579                                           "application/octet-stream" );
00580   out->getHTTPResponseHeader().addHeader( "Content-Transfer-Encoding",
00581                                           "binary" );
00582   char buff;
00583   out->write( &buff, 0 );
00584 }
00585 
00586 
00588 // State Machine call back functions //
00590 
00591 xoap::MessageReference StorageManager::handleFSMSoapMessage( xoap::MessageReference msg )
00592   throw( xoap::exception::Exception )
00593 {
00594   std::string errorMsg;
00595   xoap::MessageReference returnMsg;
00596 
00597   try {
00598     errorMsg = "Failed to extract FSM event and parameters from SOAP message: ";
00599     std::string command = soaputils::extractParameters(msg, this);
00600 
00601     errorMsg = "Failed to put a '" + command + "' state machine event into command queue: ";
00602     if (command == "Configure")
00603     {
00604       sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::Configure() ) );
00605     }
00606     else if (command == "Enable")
00607     {
00608       if (sharedResources_->configuration_->streamConfigurationHasChanged())
00609       {
00610         sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::Reconfigure() ) );
00611       }
00612       sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::Enable() ) );
00613     }
00614     else if (command == "Stop")
00615     {
00616       sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::Stop() ) );
00617     }
00618     else if (command == "Halt")
00619     {
00620       sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::Halt() ) );
00621     }
00622     else if (command == "EmergencyStop")
00623     {
00624       sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::EmergencyStop() ) );
00625     }
00626     else
00627     {
00628       XCEPT_RAISE(stor::exception::StateMachine,
00629         "Received an unknown state machine event '" + command + "'.");
00630     }
00631 
00632     errorMsg = "Failed to create FSM SOAP reply message: ";
00633     returnMsg = soaputils::createFsmSoapResponseMsg(command,
00634       sharedResources_->statisticsReporter_->
00635       getStateMachineMonitorCollection().externallyVisibleState());
00636   }
00637   catch (cms::Exception& e) {
00638     errorMsg += e.explainSelf();
00639     XCEPT_DECLARE(xoap::exception::Exception,
00640       sentinelException, errorMsg);
00641     sharedResources_->alarmHandler_->moveToFailedState( sentinelException );
00642     throw sentinelException;
00643   }
00644   catch (xcept::Exception &e) {
00645     XCEPT_DECLARE_NESTED(xoap::exception::Exception,
00646       sentinelException, errorMsg, e);
00647     sharedResources_->alarmHandler_->moveToFailedState( sentinelException );
00648     throw sentinelException;
00649   }
00650   catch (std::exception& e) {
00651     errorMsg += e.what();
00652     XCEPT_DECLARE(xoap::exception::Exception,
00653       sentinelException, errorMsg);
00654     sharedResources_->alarmHandler_->moveToFailedState( sentinelException );
00655     throw sentinelException;
00656   }
00657   catch (...) {
00658     errorMsg += "Unknown exception";
00659     XCEPT_DECLARE(xoap::exception::Exception,
00660       sentinelException, errorMsg);
00661     sharedResources_->alarmHandler_->moveToFailedState( sentinelException );
00662     throw sentinelException;
00663   }
00664 
00665   return returnMsg;
00666 }
00667 
00668 
00672 
00673 void
00674 StorageManager::processConsumerRegistrationRequest( xgi::Input* in, xgi::Output* out )
00675   throw( xgi::exception::Exception )
00676 {
00677   consumerUtils_->processConsumerRegistrationRequest(in,out);
00678 }
00679 
00680 
00681 void
00682 StorageManager::processConsumerHeaderRequest( xgi::Input* in, xgi::Output* out )
00683   throw( xgi::exception::Exception )
00684 {
00685   consumerUtils_->processConsumerHeaderRequest(in,out);
00686 }
00687 
00688 
00689 void
00690 StorageManager::processConsumerEventRequest( xgi::Input* in, xgi::Output* out )
00691   throw( xgi::exception::Exception )
00692 {
00693   consumerUtils_->processConsumerEventRequest(in,out);
00694 }
00695 
00696 
00697 void
00698 StorageManager::processDQMConsumerRegistrationRequest( xgi::Input* in, xgi::Output* out )
00699   throw( xgi::exception::Exception )
00700 {
00701   consumerUtils_->processDQMConsumerRegistrationRequest(in,out);
00702 }
00703 
00704 
00705 void
00706 StorageManager::processDQMConsumerEventRequest( xgi::Input* in, xgi::Output* out )
00707   throw( xgi::exception::Exception )
00708 {
00709   consumerUtils_->processDQMConsumerEventRequest(in,out);
00710 }
00711 
00712 namespace stor {
00714   // Specialization for ConsumerUtils //
00716   template<>
00717   void
00718   ConsumerUtils<Configuration,EventQueueCollection>::
00719   writeConsumerEvent(xgi::Output* out, const I2OChain& evt) const
00720   {
00721     writeHTTPHeaders( out );
00722     
00723     #ifdef STOR_DEBUG_CORRUPTED_EVENT_HEADER
00724     double r = rand()/static_cast<double>(RAND_MAX);
00725     if (r < 0.1)
00726     {
00727       std::cout << "Simulating corrupted event header" << std::endl;
00728       EventHeader* h = (EventHeader*)evt.dataLocation(0);
00729       h->protocolVersion_ = 1;
00730     }
00731     #endif // STOR_DEBUG_CORRUPTED_EVENT_HEADER
00732     
00733     const unsigned int nfrags = evt.fragmentCount();
00734     for ( unsigned int i = 0; i < nfrags; ++i )
00735     {
00736       const unsigned long len = evt.dataSize( i );
00737       unsigned char* location = evt.dataLocation( i );
00738       out->write( (char*)location, len );
00739     } 
00740   }
00741 }
00742 
00743 
00745 // *** Provides factory method for the instantiation of SM applications //
00747 // This macro is depreciated:
00748 XDAQ_INSTANTIATE(StorageManager)
00749 
00750 // One should use the XDAQ_INSTANTIATOR() in the header file
00751 // and this one here. But this breaks the backward compatibility,
00752 // as all xml configuration files would have to be changed to use
00753 // 'stor::StorageManager' instead of 'StorageManager'.
00754 // XDAQ_INSTANTIATOR_IMPL(stor::StorageManager)
00755 
00756 
00757 
00758 
00759 
00760 
00761 
00762