CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_6_1_1/src/EventFilter/StorageManager/src/StorageManager.cc

Go to the documentation of this file.
00001 // $Id: StorageManager.cc,v 1.139 2013/01/07 11:30:00 eulisse Exp $
00003 
00004 #include "EventFilter/StorageManager/interface/DiskWriter.h"
00005 #include "EventFilter/StorageManager/interface/DiskWriterResources.h"
00006 #include "EventFilter/StorageManager/interface/DQMEventProcessor.h"
00007 #include "EventFilter/StorageManager/interface/DQMEventProcessorResources.h"
00008 #include "EventFilter/StorageManager/interface/DiscardManager.h"
00009 #include "EventFilter/StorageManager/interface/Exception.h"
00010 #include "EventFilter/StorageManager/interface/FragmentMonitorCollection.h"
00011 #include "EventFilter/StorageManager/interface/FragmentProcessor.h"
00012 #include "EventFilter/StorageManager/interface/SoapUtils.h"
00013 #include "EventFilter/StorageManager/interface/StorageManager.h"
00014 #include "EventFilter/StorageManager/interface/StateMachine.h"
00015 #include "EventFilter/StorageManager/src/ConsumerUtils.icc"
00016 
00017 #include "EventFilter/Utilities/interface/i2oEvfMsgs.h"
00018 
00019 #include "FWCore/Utilities/interface/EDMException.h"
00020 
00021 #include "interface/shared/version.h"
00022 #include "interface/shared/i2oXFunctionCodes.h"
00023 #include "xcept/tools.h"
00024 #include "xdaq/NamespaceURI.h"
00025 #include "xdata/InfoSpaceFactory.h"
00026 #include "xgi/Method.h"
00027 #include "xoap/Method.h"
00028 
00029 #include <boost/lexical_cast.hpp>
00030 #include <boost/shared_ptr.hpp>
00031 
00032 #include <cstdlib>
00033 
00034 using namespace std;
00035 using namespace stor;
00036 
00037 
00038 StorageManager::StorageManager(xdaq::ApplicationStub * s) :
00039   xdaq::Application(s)
00040 {  
00041   LOG4CPLUS_INFO(this->getApplicationLogger(),"Making StorageManager");
00042 
00043   // bind all callback functions
00044   bindI2OCallbacks();
00045   bindStateMachineCallbacks();
00046   bindWebInterfaceCallbacks();
00047   bindConsumerCallbacks();
00048 
00049   std::string errorMsg = "Exception in StorageManager constructor: ";
00050   try
00051   {
00052     initializeSharedResources();
00053   }
00054   catch(std::exception &e)
00055   {
00056     errorMsg += e.what();
00057     LOG4CPLUS_FATAL( getApplicationLogger(), e.what() );
00058     XCEPT_RAISE( stor::exception::Exception, e.what() );
00059   }
00060   catch(...)
00061   {
00062     errorMsg += "unknown exception";
00063     LOG4CPLUS_FATAL( getApplicationLogger(), errorMsg );
00064     XCEPT_RAISE( stor::exception::Exception, errorMsg );
00065   }
00066 
00067   startWorkerThreads();
00068 }
00069 
00070 
00071 void StorageManager::bindI2OCallbacks()
00072 {
00073   i2o::bind(this,
00074             &StorageManager::receiveRegistryMessage,
00075             I2O_SM_PREAMBLE,
00076             XDAQ_ORGANIZATION_ID);
00077   i2o::bind(this,
00078             &StorageManager::receiveDataMessage,
00079             I2O_SM_DATA,
00080             XDAQ_ORGANIZATION_ID);
00081   i2o::bind(this,
00082             &StorageManager::receiveErrorDataMessage,
00083             I2O_SM_ERROR,
00084             XDAQ_ORGANIZATION_ID);
00085   i2o::bind(this,
00086             &StorageManager::receiveDQMMessage,
00087             I2O_SM_DQM,
00088             XDAQ_ORGANIZATION_ID);
00089   i2o::bind(this,
00090             &StorageManager::receiveEndOfLumiSectionMessage,
00091             I2O_EVM_LUMISECTION,
00092             XDAQ_ORGANIZATION_ID);
00093 }
00094 
00095 
00096 void StorageManager::bindStateMachineCallbacks()
00097 {
00098   xoap::bind( this,
00099               &StorageManager::handleFSMSoapMessage,
00100               "Configure",
00101               XDAQ_NS_URI );
00102   xoap::bind( this,
00103               &StorageManager::handleFSMSoapMessage,
00104               "Enable",
00105               XDAQ_NS_URI );
00106   xoap::bind( this,
00107               &StorageManager::handleFSMSoapMessage,
00108               "Stop",
00109               XDAQ_NS_URI );
00110   xoap::bind( this,
00111               &StorageManager::handleFSMSoapMessage,
00112               "Halt",
00113               XDAQ_NS_URI );
00114   xoap::bind( this,
00115               &StorageManager::handleFSMSoapMessage,
00116               "EmergencyStop",
00117               XDAQ_NS_URI );
00118 }
00119 
00120 
00121 void StorageManager::bindWebInterfaceCallbacks()
00122 {
00123   xgi::bind(this,&StorageManager::css,                      "styles.css");
00124   xgi::bind(this,&StorageManager::defaultWebPage,           "Default");
00125   xgi::bind(this,&StorageManager::inputWebPage,             "input");
00126   xgi::bind(this,&StorageManager::storedDataWebPage,        "storedData");
00127   xgi::bind(this,&StorageManager::rbsenderWebPage,          "rbsenderlist");
00128   xgi::bind(this,&StorageManager::rbsenderDetailWebPage,    "rbsenderdetail");
00129   xgi::bind(this,&StorageManager::fileStatisticsWebPage,    "fileStatistics");
00130   xgi::bind(this,&StorageManager::dqmEventStatisticsWebPage,"dqmEventStatistics");
00131   xgi::bind(this,&StorageManager::consumerStatisticsPage,   "consumerStatistics" );
00132   xgi::bind(this,&StorageManager::consumerListWebPage,      "consumerList");
00133   xgi::bind(this,&StorageManager::throughputWebPage,        "throughputStatistics");
00134 }
00135 
00136 
00137 void StorageManager::bindConsumerCallbacks()
00138 {
00139   // event consumers
00140   xgi::bind( this, &StorageManager::processConsumerRegistrationRequest, "registerConsumer" );
00141   xgi::bind( this, &StorageManager::processConsumerHeaderRequest, "getregdata" );
00142   xgi::bind( this, &StorageManager::processConsumerEventRequest, "geteventdata" );
00143 
00144   // dqm event consumers
00145   xgi::bind(this,&StorageManager::processDQMConsumerRegistrationRequest, "registerDQMConsumer");
00146   xgi::bind(this,&StorageManager::processDQMConsumerEventRequest, "getDQMeventdata");
00147 }
00148 
00149 
00150 void StorageManager::initializeSharedResources()
00151 {
00152   sharedResources_.reset(new SharedResources());
00153 
00154   xdata::InfoSpace *ispace = getApplicationInfoSpace();
00155   unsigned long instance = getApplicationDescriptor()->getInstance();
00156   sharedResources_->configuration_.reset(new Configuration(ispace, instance));
00157 
00158   QueueConfigurationParams queueParams =
00159     sharedResources_->configuration_->getQueueConfigurationParams();
00160   sharedResources_->commandQueue_.
00161     reset(new CommandQueue(queueParams.commandQueueSize_));
00162   sharedResources_->fragmentQueue_.
00163     reset(new FragmentQueue(queueParams.fragmentQueueSize_, queueParams.fragmentQueueMemoryLimitMB_ * 1024*1024));
00164   sharedResources_->registrationQueue_.
00165     reset(new RegistrationQueue(queueParams.registrationQueueSize_));
00166   sharedResources_->streamQueue_.
00167     reset(new StreamQueue(queueParams.streamQueueSize_, queueParams.streamQueueMemoryLimitMB_ * 1024*1024));
00168   sharedResources_->dqmEventQueue_.
00169     reset(new DQMEventQueue(queueParams.dqmEventQueueSize_, queueParams.dqmEventQueueMemoryLimitMB_ * 1024*1024));
00170 
00171   sharedResources_->alarmHandler_.reset( new AlarmHandler(this, sharedResources_) );
00172   sharedResources_->statisticsReporter_.reset(
00173     new StatisticsReporter(this, sharedResources_)
00174   );
00175   sharedResources_->initMsgCollection_.reset(new InitMsgCollection());
00176   sharedResources_->diskWriterResources_.reset(new DiskWriterResources());
00177   sharedResources_->dqmEventProcessorResources_.reset(new DQMEventProcessorResources());
00178 
00179   sharedResources_->statisticsReporter_->getThroughputMonitorCollection().setFragmentQueue(sharedResources_->fragmentQueue_);
00180   sharedResources_->statisticsReporter_->getThroughputMonitorCollection().setStreamQueue(sharedResources_->streamQueue_);
00181   sharedResources_->statisticsReporter_->getThroughputMonitorCollection().setDQMEventQueue(sharedResources_->dqmEventQueue_);
00182 
00183   sharedResources_->
00184     discardManager_.reset(new DiscardManager(getApplicationContext(),
00185                                              getApplicationDescriptor(),
00186                                              sharedResources_->statisticsReporter_->
00187                                              getDataSenderMonitorCollection()));
00188 
00189   sharedResources_->registrationCollection_.reset( new RegistrationCollection() );
00190   EventConsumerMonitorCollection& ecmc = 
00191     sharedResources_->statisticsReporter_->getEventConsumerMonitorCollection();
00192   sharedResources_->eventQueueCollection_.reset( new EventQueueCollection( ecmc ) );
00193 
00194   DQMConsumerMonitorCollection& dcmc = 
00195     sharedResources_->statisticsReporter_->getDQMConsumerMonitorCollection();
00196   sharedResources_->dqmEventQueueCollection_.reset( new DQMEventQueueCollection( dcmc ) );
00197 
00198   consumerUtils_.reset( new ConsumerUtils_t(
00199       sharedResources_->configuration_,
00200       sharedResources_->registrationCollection_,
00201       sharedResources_->registrationQueue_,
00202       sharedResources_->initMsgCollection_,
00203       sharedResources_->eventQueueCollection_,
00204       sharedResources_->dqmEventQueueCollection_,
00205       sharedResources_->alarmHandler_
00206     ) );
00207 
00208   smWebPageHelper_.reset( new SMWebPageHelper(
00209       getApplicationDescriptor(), sharedResources_));
00210 
00211 }
00212 
00213 
00214 void StorageManager::startWorkerThreads()
00215 {
00216 
00217   // Start the workloops
00218   try
00219   {
00220     fragmentProcessor_.reset( new FragmentProcessor( this, sharedResources_ ) );
00221     diskWriter_.reset( new DiskWriter(this, sharedResources_) );
00222     dqmEventProcessor_.reset( new DQMEventProcessor(this, sharedResources_) );
00223     sharedResources_->statisticsReporter_->startWorkLoop("theStatisticsReporter");
00224     fragmentProcessor_->startWorkLoop("theFragmentProcessor");
00225     diskWriter_->startWorkLoop("theDiskWriter");
00226     dqmEventProcessor_->startWorkLoop("theDQMEventProcessor");
00227   }
00228   catch(xcept::Exception &e)
00229   {
00230     sharedResources_->alarmHandler_->moveToFailedState( e );
00231   }
00232   catch(std::exception &e)
00233   {
00234     XCEPT_DECLARE(stor::exception::Exception,
00235       sentinelException, e.what());
00236     sharedResources_->alarmHandler_->moveToFailedState( sentinelException );
00237   }
00238   catch(...)
00239   {
00240     std::string errorMsg = "Unknown exception when starting the workloops";
00241     XCEPT_DECLARE(stor::exception::Exception,
00242       sentinelException, errorMsg);
00243     sharedResources_->alarmHandler_->moveToFailedState( sentinelException );
00244   }
00245 }
00246 
00247 
00249 // I2O call back functions //
00251 
00252 void StorageManager::receiveRegistryMessage(toolbox::mem::Reference *ref) throw (i2o::exception::Exception)
00253 {
00254   I2OChain i2oChain(ref);
00255 
00256   // Set the I2O message pool pointer. Only done for init messages.
00257   ThroughputMonitorCollection& throughputMonCollection =
00258     sharedResources_->statisticsReporter_->getThroughputMonitorCollection();
00259   throughputMonCollection.setMemoryPoolPointer( ref->getBuffer()->getPool() );
00260 
00261   FragmentMonitorCollection& fragMonCollection =
00262     sharedResources_->statisticsReporter_->getFragmentMonitorCollection();
00263   fragMonCollection.getAllFragmentSizeMQ().addSample( 
00264     static_cast<double>( i2oChain.totalDataSize() ) / 0x100000
00265   );
00266 
00267   sharedResources_->fragmentQueue_->enqWait(i2oChain);
00268 }
00269 
00270 
00271 void StorageManager::receiveDataMessage(toolbox::mem::Reference *ref) throw (i2o::exception::Exception)
00272 {
00273   I2OChain i2oChain(ref);
00274 
00275   FragmentMonitorCollection& fragMonCollection =
00276     sharedResources_->statisticsReporter_->getFragmentMonitorCollection();
00277   fragMonCollection.addEventFragmentSample( i2oChain.totalDataSize() );
00278 
00279   sharedResources_->fragmentQueue_->enqWait(i2oChain);
00280 
00281 #ifdef STOR_DEBUG_DUPLICATE_MESSAGES
00282   double r = rand()/static_cast<double>(RAND_MAX);
00283   if (r < 0.001)
00284   {
00285     LOG4CPLUS_INFO(this->getApplicationLogger(), "Simulating duplicated data message");
00286     receiveDataMessage(ref->duplicate());
00287   }
00288 #endif
00289 }
00290 
00291 
00292 void StorageManager::receiveErrorDataMessage(toolbox::mem::Reference *ref) throw (i2o::exception::Exception)
00293 {
00294   I2OChain i2oChain(ref);
00295 
00296   FragmentMonitorCollection& fragMonCollection =
00297     sharedResources_->statisticsReporter_->getFragmentMonitorCollection();
00298   fragMonCollection.addEventFragmentSample( i2oChain.totalDataSize() );
00299 
00300   sharedResources_->fragmentQueue_->enqWait(i2oChain);
00301 }
00302 
00303 
00304 void StorageManager::receiveDQMMessage(toolbox::mem::Reference *ref) throw (i2o::exception::Exception)
00305 {
00306   I2OChain i2oChain(ref);
00307 
00308   FragmentMonitorCollection& fragMonCollection =
00309     sharedResources_->statisticsReporter_->getFragmentMonitorCollection();
00310   fragMonCollection.addDQMEventFragmentSample( i2oChain.totalDataSize() );
00311 
00312   sharedResources_->fragmentQueue_->enqWait(i2oChain);
00313 }
00314 
00315 
00316 void StorageManager::receiveEndOfLumiSectionMessage(toolbox::mem::Reference *ref) throw (i2o::exception::Exception)
00317 {
00318   I2OChain i2oChain( ref );
00319 
00320   FragmentMonitorCollection& fragMonCollection =
00321     sharedResources_->statisticsReporter_->getFragmentMonitorCollection();
00322   fragMonCollection.addFragmentSample( i2oChain.totalDataSize() );
00323 
00324   RunMonitorCollection& runMonCollection =
00325     sharedResources_->statisticsReporter_->getRunMonitorCollection();
00326   runMonCollection.getEoLSSeenMQ().addSample( i2oChain.lumiSection() );
00327 
00328   sharedResources_->streamQueue_->enqWait( i2oChain );
00329 }
00330 
00331 
00333 // Web interface call back functions //
00335 
00336 void StorageManager::css(xgi::Input *in, xgi::Output *out)
00337 throw (xgi::exception::Exception)
00338 {
00339   smWebPageHelper_->css(in,out);
00340 }
00341 
00342 
00343 void StorageManager::defaultWebPage(xgi::Input *in, xgi::Output *out)
00344 throw (xgi::exception::Exception)
00345 {
00346   std::string errorMsg = "Failed to create the default webpage";
00347   
00348   try
00349   {
00350     smWebPageHelper_->defaultWebPage(out);
00351   }
00352   catch(std::exception &e)
00353   {
00354     errorMsg += ": ";
00355     errorMsg += e.what();
00356     
00357     LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00358     XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00359   }
00360   catch(...)
00361   {
00362     errorMsg += ": Unknown exception";
00363     
00364     LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00365     XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00366   }
00367 }
00368 
00369 
00370 void StorageManager::inputWebPage(xgi::Input *in, xgi::Output *out)
00371   throw (xgi::exception::Exception)
00372 {
00373   std::string errorMsg = "Failed to create the I2O input webpage";
00374 
00375   try
00376   {
00377     smWebPageHelper_->inputWebPage(out);
00378   }
00379   catch(std::exception &e)
00380   {
00381     errorMsg += ": ";
00382     errorMsg += e.what();
00383     
00384     LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00385     XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00386   }
00387   catch(...)
00388   {
00389     errorMsg += ": Unknown exception";
00390     
00391     LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00392     XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00393   }
00394 }
00395 
00396 
00397 void StorageManager::storedDataWebPage(xgi::Input *in, xgi::Output *out)
00398   throw (xgi::exception::Exception)
00399 {
00400   std::string errorMsg = "Failed to create the stored data webpage";
00401 
00402   try
00403   {
00404     smWebPageHelper_->storedDataWebPage(out);
00405   }
00406   catch(std::exception &e)
00407   {
00408     errorMsg += ": ";
00409     errorMsg += e.what();
00410     
00411     LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00412     XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00413   }
00414   catch(...)
00415   {
00416     errorMsg += ": Unknown exception";
00417     
00418     LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00419     XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00420   }
00421 }
00422 
00423 
00424 void StorageManager::consumerStatisticsPage( xgi::Input* in,
00425                                              xgi::Output* out )
00426   throw( xgi::exception::Exception )
00427 {
00428 
00429   std::string err_msg =
00430     "Failed to create consumer statistics page";
00431 
00432   try
00433   {
00434     smWebPageHelper_->consumerStatistics(out);
00435   }
00436   catch( std::exception &e )
00437   {
00438     err_msg += ": ";
00439     err_msg += e.what();
00440     LOG4CPLUS_ERROR( getApplicationLogger(), err_msg );
00441     XCEPT_RAISE( xgi::exception::Exception, err_msg );
00442   }
00443   catch(...)
00444   {
00445     err_msg += ": Unknown exception";
00446     LOG4CPLUS_ERROR( getApplicationLogger(), err_msg );
00447     XCEPT_RAISE( xgi::exception::Exception, err_msg );
00448   }
00449 
00450 }
00451 
00452 
00453 void StorageManager::rbsenderWebPage(xgi::Input *in, xgi::Output *out)
00454   throw (xgi::exception::Exception)
00455 {
00456   std::string errorMsg = "Failed to create the data sender webpage";
00457 
00458   try
00459   {
00460     smWebPageHelper_->resourceBrokerOverview(out);
00461   }
00462   catch(std::exception &e)
00463   {
00464     errorMsg += ": ";
00465     errorMsg += e.what();
00466     
00467     LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00468     XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00469   }
00470   catch(...)
00471   {
00472     errorMsg += ": Unknown exception";
00473     
00474     LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00475     XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00476   }
00477 
00478 }
00479 
00480 
00481 void StorageManager::rbsenderDetailWebPage(xgi::Input *in, xgi::Output *out)
00482   throw (xgi::exception::Exception)
00483 {
00484   std::string errorMsg = "Failed to create the data sender webpage";
00485 
00486   try
00487   {
00488     long long localRBID = 0;
00489     cgicc::Cgicc cgiWrapper(in);
00490     cgicc::const_form_iterator updateRef = cgiWrapper.getElement("id");
00491     if (updateRef != cgiWrapper.getElements().end())
00492     {
00493       std::string idString = updateRef->getValue();
00494       localRBID = boost::lexical_cast<long long>(idString);
00495     }
00496 
00497     smWebPageHelper_->resourceBrokerDetail(out, localRBID);
00498   }
00499   catch(std::exception &e)
00500   {
00501     errorMsg += ": ";
00502     errorMsg += e.what();
00503     
00504     LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00505     XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00506   }
00507   catch(...)
00508   {
00509     errorMsg += ": Unknown exception";
00510     
00511     LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00512     XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00513   }
00514 
00515 }
00516 
00517 
00518 void StorageManager::fileStatisticsWebPage(xgi::Input *in, xgi::Output *out)
00519   throw (xgi::exception::Exception)
00520 {
00521   std::string errorMsg = "Failed to create the file statistics webpage";
00522 
00523   try
00524   {
00525     smWebPageHelper_->filesWebPage(out);
00526   }
00527   catch(std::exception &e)
00528   {
00529     errorMsg += ": ";
00530     errorMsg += e.what();
00531     
00532     LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00533     XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00534   }
00535   catch(...)
00536   {
00537     errorMsg += ": Unknown exception";
00538     
00539     LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00540     XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00541   }
00542 
00543 }
00544 
00545 
00546 void StorageManager::dqmEventStatisticsWebPage(xgi::Input *in, xgi::Output *out)
00547   throw (xgi::exception::Exception)
00548 {
00549   std::string errorMsg = "Failed to create the DQM event statistics webpage";
00550 
00551   try
00552   {
00553     smWebPageHelper_->dqmEventWebPage(out);
00554   }
00555   catch(std::exception &e)
00556   {
00557     errorMsg += ": ";
00558     errorMsg += e.what();
00559     
00560     LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00561     XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00562   }
00563   catch(...)
00564   {
00565     errorMsg += ": Unknown exception";
00566     
00567     LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00568     XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00569   }
00570 }
00571 
00572 
00573 void StorageManager::throughputWebPage(xgi::Input *in, xgi::Output *out)
00574   throw (xgi::exception::Exception)
00575 {
00576   std::string errorMsg = "Failed to create the throughput statistics webpage";
00577 
00578   try
00579   {
00580     smWebPageHelper_->throughputWebPage(out);
00581   }
00582   catch(std::exception &e)
00583   {
00584     errorMsg += ": ";
00585     errorMsg += e.what();
00586     
00587     LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00588     XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00589   }
00590   catch(...)
00591   {
00592     errorMsg += ": Unknown exception";
00593     
00594     LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00595     XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00596   }
00597 
00598 }
00599 
00600 
00601 // Leaving in for now but making it return an empty buffer:
00602 void StorageManager::consumerListWebPage(xgi::Input *in, xgi::Output *out)
00603   throw (xgi::exception::Exception)
00604 {
00605   out->getHTTPResponseHeader().addHeader( "Content-Type",
00606                                           "application/octet-stream" );
00607   out->getHTTPResponseHeader().addHeader( "Content-Transfer-Encoding",
00608                                           "binary" );
00609   char buff;
00610   out->write( &buff, 0 );
00611 }
00612 
00613 
00615 // State Machine call back functions //
00617 
00618 xoap::MessageReference StorageManager::handleFSMSoapMessage( xoap::MessageReference msg )
00619   throw( xoap::exception::Exception )
00620 {
00621   std::string errorMsg;
00622   xoap::MessageReference returnMsg;
00623 
00624   try {
00625     errorMsg = "Failed to extract FSM event and parameters from SOAP message: ";
00626     std::string command = soaputils::extractParameters(msg, this);
00627 
00628     errorMsg = "Failed to put a '" + command + "' state machine event into command queue: ";
00629     if (command == "Configure")
00630     {
00631       sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::Configure() ) );
00632     }
00633     else if (command == "Enable")
00634     {
00635       if (sharedResources_->configuration_->streamConfigurationHasChanged())
00636       {
00637         sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::Reconfigure() ) );
00638       }
00639       sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::Enable() ) );
00640     }
00641     else if (command == "Stop")
00642     {
00643       sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::Stop() ) );
00644     }
00645     else if (command == "Halt")
00646     {
00647       sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::Halt() ) );
00648     }
00649     else if (command == "EmergencyStop")
00650     {
00651       sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::EmergencyStop() ) );
00652     }
00653     else
00654     {
00655       XCEPT_RAISE(stor::exception::StateMachine,
00656         "Received an unknown state machine event '" + command + "'.");
00657     }
00658 
00659     errorMsg = "Failed to create FSM SOAP reply message: ";
00660     returnMsg = soaputils::createFsmSoapResponseMsg(command,
00661       sharedResources_->statisticsReporter_->
00662       getStateMachineMonitorCollection().externallyVisibleState());
00663   }
00664   catch (cms::Exception& e) {
00665     errorMsg += e.explainSelf();
00666     XCEPT_DECLARE(xoap::exception::Exception,
00667       sentinelException, errorMsg);
00668     sharedResources_->alarmHandler_->moveToFailedState( sentinelException );
00669     throw sentinelException;
00670   }
00671   catch (xcept::Exception &e) {
00672     XCEPT_DECLARE_NESTED(xoap::exception::Exception,
00673       sentinelException, errorMsg, e);
00674     sharedResources_->alarmHandler_->moveToFailedState( sentinelException );
00675     throw sentinelException;
00676   }
00677   catch (std::exception& e) {
00678     errorMsg += e.what();
00679     XCEPT_DECLARE(xoap::exception::Exception,
00680       sentinelException, errorMsg);
00681     sharedResources_->alarmHandler_->moveToFailedState( sentinelException );
00682     throw sentinelException;
00683   }
00684   catch (...) {
00685     errorMsg += "Unknown exception";
00686     XCEPT_DECLARE(xoap::exception::Exception,
00687       sentinelException, errorMsg);
00688     sharedResources_->alarmHandler_->moveToFailedState( sentinelException );
00689     throw sentinelException;
00690   }
00691 
00692   return returnMsg;
00693 }
00694 
00695 
00699 
00700 void
00701 StorageManager::processConsumerRegistrationRequest( xgi::Input* in, xgi::Output* out )
00702   throw( xgi::exception::Exception )
00703 {
00704   consumerUtils_->processConsumerRegistrationRequest(in,out);
00705 }
00706 
00707 
00708 void
00709 StorageManager::processConsumerHeaderRequest( xgi::Input* in, xgi::Output* out )
00710   throw( xgi::exception::Exception )
00711 {
00712   consumerUtils_->processConsumerHeaderRequest(in,out);
00713 }
00714 
00715 
00716 void
00717 StorageManager::processConsumerEventRequest( xgi::Input* in, xgi::Output* out )
00718   throw( xgi::exception::Exception )
00719 {
00720   consumerUtils_->processConsumerEventRequest(in,out);
00721 }
00722 
00723 
00724 void
00725 StorageManager::processDQMConsumerRegistrationRequest( xgi::Input* in, xgi::Output* out )
00726   throw( xgi::exception::Exception )
00727 {
00728   consumerUtils_->processDQMConsumerRegistrationRequest(in,out);
00729 }
00730 
00731 
00732 void
00733 StorageManager::processDQMConsumerEventRequest( xgi::Input* in, xgi::Output* out )
00734   throw( xgi::exception::Exception )
00735 {
00736   consumerUtils_->processDQMConsumerEventRequest(in,out);
00737 }
00738 
00739 namespace stor {
00741   // Specialization for ConsumerUtils //
00743   template<>
00744   void
00745   ConsumerUtils<Configuration,EventQueueCollection>::
00746   writeConsumerEvent(xgi::Output* out, const I2OChain& evt) const
00747   {
00748     writeHTTPHeaders( out );
00749     
00750     #ifdef STOR_DEBUG_CORRUPTED_EVENT_HEADER
00751     double r = rand()/static_cast<double>(RAND_MAX);
00752     if (r < 0.1)
00753     {
00754       std::cout << "Simulating corrupted event header" << std::endl;
00755       EventHeader* h = (EventHeader*)evt.dataLocation(0);
00756       h->protocolVersion_ = 1;
00757     }
00758     #endif // STOR_DEBUG_CORRUPTED_EVENT_HEADER
00759     
00760     const unsigned int nfrags = evt.fragmentCount();
00761     for ( unsigned int i = 0; i < nfrags; ++i )
00762     {
00763       const unsigned long len = evt.dataSize( i );
00764       unsigned char* location = evt.dataLocation( i );
00765       out->write( (char*)location, len );
00766     } 
00767   }
00768 }
00769 
00770 
00772 // *** Provides factory method for the instantiation of SM applications //
00774 // This macro is depreciated:
00775 XDAQ_INSTANTIATE(StorageManager)
00776 
00777 // One should use the XDAQ_INSTANTIATOR() in the header file
00778 // and this one here. But this breaks the backward compatibility,
00779 // as all xml configuration files would have to be changed to use
00780 // 'stor::StorageManager' instead of 'StorageManager'.
00781 // XDAQ_INSTANTIATOR_IMPL(stor::StorageManager)
00782 
00783 
00784 
00785 
00786 
00787 
00788 
00789