CMS 3D CMS Logo

/afs/cern.ch/work/a/aaltunda/public/www/CMSSW_5_3_14/src/EventFilter/StorageManager/src/StorageManager.cc

Go to the documentation of this file.
00001 // $Id: StorageManager.cc,v 1.138 2011/11/17 17:35:40 mommsen Exp $
00003 
00004 #include "EventFilter/StorageManager/interface/DiskWriter.h"
00005 #include "EventFilter/StorageManager/interface/DiskWriterResources.h"
00006 #include "EventFilter/StorageManager/interface/DQMEventProcessor.h"
00007 #include "EventFilter/StorageManager/interface/DQMEventProcessorResources.h"
00008 #include "EventFilter/StorageManager/interface/DiscardManager.h"
00009 #include "EventFilter/StorageManager/interface/Exception.h"
00010 #include "EventFilter/StorageManager/interface/FragmentMonitorCollection.h"
00011 #include "EventFilter/StorageManager/interface/FragmentProcessor.h"
00012 #include "EventFilter/StorageManager/interface/SoapUtils.h"
00013 #include "EventFilter/StorageManager/interface/StorageManager.h"
00014 #include "EventFilter/StorageManager/interface/StateMachine.h"
00015 #include "EventFilter/StorageManager/src/ConsumerUtils.icc"
00016 
00017 #include "EventFilter/Utilities/interface/i2oEvfMsgs.h"
00018 
00019 #include "FWCore/Utilities/interface/EDMException.h"
00020 
00021 #include "i2o/Method.h"
00022 #include "interface/shared/version.h"
00023 #include "interface/shared/i2oXFunctionCodes.h"
00024 #include "xcept/tools.h"
00025 #include "xdaq/NamespaceURI.h"
00026 #include "xdata/InfoSpaceFactory.h"
00027 #include "xgi/Method.h"
00028 #include "xoap/Method.h"
00029 
00030 #include <boost/lexical_cast.hpp>
00031 #include <boost/shared_ptr.hpp>
00032 
00033 #include <cstdlib>
00034 
00035 using namespace std;
00036 using namespace stor;
00037 
00038 
00039 StorageManager::StorageManager(xdaq::ApplicationStub * s) :
00040   xdaq::Application(s)
00041 {  
00042   LOG4CPLUS_INFO(this->getApplicationLogger(),"Making StorageManager");
00043 
00044   // bind all callback functions
00045   bindI2OCallbacks();
00046   bindStateMachineCallbacks();
00047   bindWebInterfaceCallbacks();
00048   bindConsumerCallbacks();
00049 
00050   std::string errorMsg = "Exception in StorageManager constructor: ";
00051   try
00052   {
00053     initializeSharedResources();
00054   }
00055   catch(std::exception &e)
00056   {
00057     errorMsg += e.what();
00058     LOG4CPLUS_FATAL( getApplicationLogger(), e.what() );
00059     XCEPT_RAISE( stor::exception::Exception, e.what() );
00060   }
00061   catch(...)
00062   {
00063     errorMsg += "unknown exception";
00064     LOG4CPLUS_FATAL( getApplicationLogger(), errorMsg );
00065     XCEPT_RAISE( stor::exception::Exception, errorMsg );
00066   }
00067 
00068   startWorkerThreads();
00069 }
00070 
00071 
00072 void StorageManager::bindI2OCallbacks()
00073 {
00074   i2o::bind(this,
00075             &StorageManager::receiveRegistryMessage,
00076             I2O_SM_PREAMBLE,
00077             XDAQ_ORGANIZATION_ID);
00078   i2o::bind(this,
00079             &StorageManager::receiveDataMessage,
00080             I2O_SM_DATA,
00081             XDAQ_ORGANIZATION_ID);
00082   i2o::bind(this,
00083             &StorageManager::receiveErrorDataMessage,
00084             I2O_SM_ERROR,
00085             XDAQ_ORGANIZATION_ID);
00086   i2o::bind(this,
00087             &StorageManager::receiveDQMMessage,
00088             I2O_SM_DQM,
00089             XDAQ_ORGANIZATION_ID);
00090   i2o::bind(this,
00091             &StorageManager::receiveEndOfLumiSectionMessage,
00092             I2O_EVM_LUMISECTION,
00093             XDAQ_ORGANIZATION_ID);
00094 }
00095 
00096 
00097 void StorageManager::bindStateMachineCallbacks()
00098 {
00099   xoap::bind( this,
00100               &StorageManager::handleFSMSoapMessage,
00101               "Configure",
00102               XDAQ_NS_URI );
00103   xoap::bind( this,
00104               &StorageManager::handleFSMSoapMessage,
00105               "Enable",
00106               XDAQ_NS_URI );
00107   xoap::bind( this,
00108               &StorageManager::handleFSMSoapMessage,
00109               "Stop",
00110               XDAQ_NS_URI );
00111   xoap::bind( this,
00112               &StorageManager::handleFSMSoapMessage,
00113               "Halt",
00114               XDAQ_NS_URI );
00115   xoap::bind( this,
00116               &StorageManager::handleFSMSoapMessage,
00117               "EmergencyStop",
00118               XDAQ_NS_URI );
00119 }
00120 
00121 
00122 void StorageManager::bindWebInterfaceCallbacks()
00123 {
00124   xgi::bind(this,&StorageManager::css,                      "styles.css");
00125   xgi::bind(this,&StorageManager::defaultWebPage,           "Default");
00126   xgi::bind(this,&StorageManager::inputWebPage,             "input");
00127   xgi::bind(this,&StorageManager::storedDataWebPage,        "storedData");
00128   xgi::bind(this,&StorageManager::rbsenderWebPage,          "rbsenderlist");
00129   xgi::bind(this,&StorageManager::rbsenderDetailWebPage,    "rbsenderdetail");
00130   xgi::bind(this,&StorageManager::fileStatisticsWebPage,    "fileStatistics");
00131   xgi::bind(this,&StorageManager::dqmEventStatisticsWebPage,"dqmEventStatistics");
00132   xgi::bind(this,&StorageManager::consumerStatisticsPage,   "consumerStatistics" );
00133   xgi::bind(this,&StorageManager::consumerListWebPage,      "consumerList");
00134   xgi::bind(this,&StorageManager::throughputWebPage,        "throughputStatistics");
00135 }
00136 
00137 
00138 void StorageManager::bindConsumerCallbacks()
00139 {
00140   // event consumers
00141   xgi::bind( this, &StorageManager::processConsumerRegistrationRequest, "registerConsumer" );
00142   xgi::bind( this, &StorageManager::processConsumerHeaderRequest, "getregdata" );
00143   xgi::bind( this, &StorageManager::processConsumerEventRequest, "geteventdata" );
00144 
00145   // dqm event consumers
00146   xgi::bind(this,&StorageManager::processDQMConsumerRegistrationRequest, "registerDQMConsumer");
00147   xgi::bind(this,&StorageManager::processDQMConsumerEventRequest, "getDQMeventdata");
00148 }
00149 
00150 
00151 void StorageManager::initializeSharedResources()
00152 {
00153   sharedResources_.reset(new SharedResources());
00154 
00155   xdata::InfoSpace *ispace = getApplicationInfoSpace();
00156   unsigned long instance = getApplicationDescriptor()->getInstance();
00157   sharedResources_->configuration_.reset(new Configuration(ispace, instance));
00158 
00159   QueueConfigurationParams queueParams =
00160     sharedResources_->configuration_->getQueueConfigurationParams();
00161   sharedResources_->commandQueue_.
00162     reset(new CommandQueue(queueParams.commandQueueSize_));
00163   sharedResources_->fragmentQueue_.
00164     reset(new FragmentQueue(queueParams.fragmentQueueSize_, queueParams.fragmentQueueMemoryLimitMB_ * 1024*1024));
00165   sharedResources_->registrationQueue_.
00166     reset(new RegistrationQueue(queueParams.registrationQueueSize_));
00167   sharedResources_->streamQueue_.
00168     reset(new StreamQueue(queueParams.streamQueueSize_, queueParams.streamQueueMemoryLimitMB_ * 1024*1024));
00169   sharedResources_->dqmEventQueue_.
00170     reset(new DQMEventQueue(queueParams.dqmEventQueueSize_, queueParams.dqmEventQueueMemoryLimitMB_ * 1024*1024));
00171 
00172   sharedResources_->alarmHandler_.reset( new AlarmHandler(this, sharedResources_) );
00173   sharedResources_->statisticsReporter_.reset(
00174     new StatisticsReporter(this, sharedResources_)
00175   );
00176   sharedResources_->initMsgCollection_.reset(new InitMsgCollection());
00177   sharedResources_->diskWriterResources_.reset(new DiskWriterResources());
00178   sharedResources_->dqmEventProcessorResources_.reset(new DQMEventProcessorResources());
00179 
00180   sharedResources_->statisticsReporter_->getThroughputMonitorCollection().setFragmentQueue(sharedResources_->fragmentQueue_);
00181   sharedResources_->statisticsReporter_->getThroughputMonitorCollection().setStreamQueue(sharedResources_->streamQueue_);
00182   sharedResources_->statisticsReporter_->getThroughputMonitorCollection().setDQMEventQueue(sharedResources_->dqmEventQueue_);
00183 
00184   sharedResources_->
00185     discardManager_.reset(new DiscardManager(getApplicationContext(),
00186                                              getApplicationDescriptor(),
00187                                              sharedResources_->statisticsReporter_->
00188                                              getDataSenderMonitorCollection()));
00189 
00190   sharedResources_->registrationCollection_.reset( new RegistrationCollection() );
00191   EventConsumerMonitorCollection& ecmc = 
00192     sharedResources_->statisticsReporter_->getEventConsumerMonitorCollection();
00193   sharedResources_->eventQueueCollection_.reset( new EventQueueCollection( ecmc ) );
00194 
00195   DQMConsumerMonitorCollection& dcmc = 
00196     sharedResources_->statisticsReporter_->getDQMConsumerMonitorCollection();
00197   sharedResources_->dqmEventQueueCollection_.reset( new DQMEventQueueCollection( dcmc ) );
00198 
00199   consumerUtils_.reset( new ConsumerUtils_t(
00200       sharedResources_->configuration_,
00201       sharedResources_->registrationCollection_,
00202       sharedResources_->registrationQueue_,
00203       sharedResources_->initMsgCollection_,
00204       sharedResources_->eventQueueCollection_,
00205       sharedResources_->dqmEventQueueCollection_,
00206       sharedResources_->alarmHandler_
00207     ) );
00208 
00209   smWebPageHelper_.reset( new SMWebPageHelper(
00210       getApplicationDescriptor(), sharedResources_));
00211 
00212 }
00213 
00214 
00215 void StorageManager::startWorkerThreads()
00216 {
00217 
00218   // Start the workloops
00219   try
00220   {
00221     fragmentProcessor_.reset( new FragmentProcessor( this, sharedResources_ ) );
00222     diskWriter_.reset( new DiskWriter(this, sharedResources_) );
00223     dqmEventProcessor_.reset( new DQMEventProcessor(this, sharedResources_) );
00224     sharedResources_->statisticsReporter_->startWorkLoop("theStatisticsReporter");
00225     fragmentProcessor_->startWorkLoop("theFragmentProcessor");
00226     diskWriter_->startWorkLoop("theDiskWriter");
00227     dqmEventProcessor_->startWorkLoop("theDQMEventProcessor");
00228   }
00229   catch(xcept::Exception &e)
00230   {
00231     sharedResources_->alarmHandler_->moveToFailedState( e );
00232   }
00233   catch(std::exception &e)
00234   {
00235     XCEPT_DECLARE(stor::exception::Exception,
00236       sentinelException, e.what());
00237     sharedResources_->alarmHandler_->moveToFailedState( sentinelException );
00238   }
00239   catch(...)
00240   {
00241     std::string errorMsg = "Unknown exception when starting the workloops";
00242     XCEPT_DECLARE(stor::exception::Exception,
00243       sentinelException, errorMsg);
00244     sharedResources_->alarmHandler_->moveToFailedState( sentinelException );
00245   }
00246 }
00247 
00248 
00250 // I2O call back functions //
00252 
00253 void StorageManager::receiveRegistryMessage(toolbox::mem::Reference *ref)
00254 {
00255   I2OChain i2oChain(ref);
00256 
00257   // Set the I2O message pool pointer. Only done for init messages.
00258   ThroughputMonitorCollection& throughputMonCollection =
00259     sharedResources_->statisticsReporter_->getThroughputMonitorCollection();
00260   throughputMonCollection.setMemoryPoolPointer( ref->getBuffer()->getPool() );
00261 
00262   FragmentMonitorCollection& fragMonCollection =
00263     sharedResources_->statisticsReporter_->getFragmentMonitorCollection();
00264   fragMonCollection.getAllFragmentSizeMQ().addSample( 
00265     static_cast<double>( i2oChain.totalDataSize() ) / 0x100000
00266   );
00267 
00268   sharedResources_->fragmentQueue_->enqWait(i2oChain);
00269 }
00270 
00271 
00272 void StorageManager::receiveDataMessage(toolbox::mem::Reference *ref)
00273 {
00274   I2OChain i2oChain(ref);
00275 
00276   FragmentMonitorCollection& fragMonCollection =
00277     sharedResources_->statisticsReporter_->getFragmentMonitorCollection();
00278   fragMonCollection.addEventFragmentSample( i2oChain.totalDataSize() );
00279 
00280   sharedResources_->fragmentQueue_->enqWait(i2oChain);
00281 
00282 #ifdef STOR_DEBUG_DUPLICATE_MESSAGES
00283   double r = rand()/static_cast<double>(RAND_MAX);
00284   if (r < 0.001)
00285   {
00286     LOG4CPLUS_INFO(this->getApplicationLogger(), "Simulating duplicated data message");
00287     receiveDataMessage(ref->duplicate());
00288   }
00289 #endif
00290 }
00291 
00292 
00293 void StorageManager::receiveErrorDataMessage(toolbox::mem::Reference *ref)
00294 {
00295   I2OChain i2oChain(ref);
00296 
00297   FragmentMonitorCollection& fragMonCollection =
00298     sharedResources_->statisticsReporter_->getFragmentMonitorCollection();
00299   fragMonCollection.addEventFragmentSample( i2oChain.totalDataSize() );
00300 
00301   sharedResources_->fragmentQueue_->enqWait(i2oChain);
00302 }
00303 
00304 
00305 void StorageManager::receiveDQMMessage(toolbox::mem::Reference *ref)
00306 {
00307   I2OChain i2oChain(ref);
00308 
00309   FragmentMonitorCollection& fragMonCollection =
00310     sharedResources_->statisticsReporter_->getFragmentMonitorCollection();
00311   fragMonCollection.addDQMEventFragmentSample( i2oChain.totalDataSize() );
00312 
00313   sharedResources_->fragmentQueue_->enqWait(i2oChain);
00314 }
00315 
00316 
00317 void StorageManager::receiveEndOfLumiSectionMessage(toolbox::mem::Reference *ref)
00318 {
00319   I2OChain i2oChain( ref );
00320 
00321   FragmentMonitorCollection& fragMonCollection =
00322     sharedResources_->statisticsReporter_->getFragmentMonitorCollection();
00323   fragMonCollection.addFragmentSample( i2oChain.totalDataSize() );
00324 
00325   RunMonitorCollection& runMonCollection =
00326     sharedResources_->statisticsReporter_->getRunMonitorCollection();
00327   runMonCollection.getEoLSSeenMQ().addSample( i2oChain.lumiSection() );
00328 
00329   sharedResources_->streamQueue_->enqWait( i2oChain );
00330 }
00331 
00332 
00334 // Web interface call back functions //
00336 
00337 void StorageManager::css(xgi::Input *in, xgi::Output *out)
00338 throw (xgi::exception::Exception)
00339 {
00340   smWebPageHelper_->css(in,out);
00341 }
00342 
00343 
00344 void StorageManager::defaultWebPage(xgi::Input *in, xgi::Output *out)
00345 throw (xgi::exception::Exception)
00346 {
00347   std::string errorMsg = "Failed to create the default webpage";
00348   
00349   try
00350   {
00351     smWebPageHelper_->defaultWebPage(out);
00352   }
00353   catch(std::exception &e)
00354   {
00355     errorMsg += ": ";
00356     errorMsg += e.what();
00357     
00358     LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00359     XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00360   }
00361   catch(...)
00362   {
00363     errorMsg += ": Unknown exception";
00364     
00365     LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00366     XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00367   }
00368 }
00369 
00370 
00371 void StorageManager::inputWebPage(xgi::Input *in, xgi::Output *out)
00372   throw (xgi::exception::Exception)
00373 {
00374   std::string errorMsg = "Failed to create the I2O input webpage";
00375 
00376   try
00377   {
00378     smWebPageHelper_->inputWebPage(out);
00379   }
00380   catch(std::exception &e)
00381   {
00382     errorMsg += ": ";
00383     errorMsg += e.what();
00384     
00385     LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00386     XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00387   }
00388   catch(...)
00389   {
00390     errorMsg += ": Unknown exception";
00391     
00392     LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00393     XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00394   }
00395 }
00396 
00397 
00398 void StorageManager::storedDataWebPage(xgi::Input *in, xgi::Output *out)
00399   throw (xgi::exception::Exception)
00400 {
00401   std::string errorMsg = "Failed to create the stored data webpage";
00402 
00403   try
00404   {
00405     smWebPageHelper_->storedDataWebPage(out);
00406   }
00407   catch(std::exception &e)
00408   {
00409     errorMsg += ": ";
00410     errorMsg += e.what();
00411     
00412     LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00413     XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00414   }
00415   catch(...)
00416   {
00417     errorMsg += ": Unknown exception";
00418     
00419     LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00420     XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00421   }
00422 }
00423 
00424 
00425 void StorageManager::consumerStatisticsPage( xgi::Input* in,
00426                                              xgi::Output* out )
00427   throw( xgi::exception::Exception )
00428 {
00429 
00430   std::string err_msg =
00431     "Failed to create consumer statistics page";
00432 
00433   try
00434   {
00435     smWebPageHelper_->consumerStatistics(out);
00436   }
00437   catch( std::exception &e )
00438   {
00439     err_msg += ": ";
00440     err_msg += e.what();
00441     LOG4CPLUS_ERROR( getApplicationLogger(), err_msg );
00442     XCEPT_RAISE( xgi::exception::Exception, err_msg );
00443   }
00444   catch(...)
00445   {
00446     err_msg += ": Unknown exception";
00447     LOG4CPLUS_ERROR( getApplicationLogger(), err_msg );
00448     XCEPT_RAISE( xgi::exception::Exception, err_msg );
00449   }
00450 
00451 }
00452 
00453 
00454 void StorageManager::rbsenderWebPage(xgi::Input *in, xgi::Output *out)
00455   throw (xgi::exception::Exception)
00456 {
00457   std::string errorMsg = "Failed to create the data sender webpage";
00458 
00459   try
00460   {
00461     smWebPageHelper_->resourceBrokerOverview(out);
00462   }
00463   catch(std::exception &e)
00464   {
00465     errorMsg += ": ";
00466     errorMsg += e.what();
00467     
00468     LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00469     XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00470   }
00471   catch(...)
00472   {
00473     errorMsg += ": Unknown exception";
00474     
00475     LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00476     XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00477   }
00478 
00479 }
00480 
00481 
00482 void StorageManager::rbsenderDetailWebPage(xgi::Input *in, xgi::Output *out)
00483   throw (xgi::exception::Exception)
00484 {
00485   std::string errorMsg = "Failed to create the data sender webpage";
00486 
00487   try
00488   {
00489     long long localRBID = 0;
00490     cgicc::Cgicc cgiWrapper(in);
00491     cgicc::const_form_iterator updateRef = cgiWrapper.getElement("id");
00492     if (updateRef != cgiWrapper.getElements().end())
00493     {
00494       std::string idString = updateRef->getValue();
00495       localRBID = boost::lexical_cast<long long>(idString);
00496     }
00497 
00498     smWebPageHelper_->resourceBrokerDetail(out, localRBID);
00499   }
00500   catch(std::exception &e)
00501   {
00502     errorMsg += ": ";
00503     errorMsg += e.what();
00504     
00505     LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00506     XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00507   }
00508   catch(...)
00509   {
00510     errorMsg += ": Unknown exception";
00511     
00512     LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00513     XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00514   }
00515 
00516 }
00517 
00518 
00519 void StorageManager::fileStatisticsWebPage(xgi::Input *in, xgi::Output *out)
00520   throw (xgi::exception::Exception)
00521 {
00522   std::string errorMsg = "Failed to create the file statistics webpage";
00523 
00524   try
00525   {
00526     smWebPageHelper_->filesWebPage(out);
00527   }
00528   catch(std::exception &e)
00529   {
00530     errorMsg += ": ";
00531     errorMsg += e.what();
00532     
00533     LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00534     XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00535   }
00536   catch(...)
00537   {
00538     errorMsg += ": Unknown exception";
00539     
00540     LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00541     XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00542   }
00543 
00544 }
00545 
00546 
00547 void StorageManager::dqmEventStatisticsWebPage(xgi::Input *in, xgi::Output *out)
00548   throw (xgi::exception::Exception)
00549 {
00550   std::string errorMsg = "Failed to create the DQM event statistics webpage";
00551 
00552   try
00553   {
00554     smWebPageHelper_->dqmEventWebPage(out);
00555   }
00556   catch(std::exception &e)
00557   {
00558     errorMsg += ": ";
00559     errorMsg += e.what();
00560     
00561     LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00562     XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00563   }
00564   catch(...)
00565   {
00566     errorMsg += ": Unknown exception";
00567     
00568     LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00569     XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00570   }
00571 }
00572 
00573 
00574 void StorageManager::throughputWebPage(xgi::Input *in, xgi::Output *out)
00575   throw (xgi::exception::Exception)
00576 {
00577   std::string errorMsg = "Failed to create the throughput statistics webpage";
00578 
00579   try
00580   {
00581     smWebPageHelper_->throughputWebPage(out);
00582   }
00583   catch(std::exception &e)
00584   {
00585     errorMsg += ": ";
00586     errorMsg += e.what();
00587     
00588     LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00589     XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00590   }
00591   catch(...)
00592   {
00593     errorMsg += ": Unknown exception";
00594     
00595     LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
00596     XCEPT_RAISE(xgi::exception::Exception, errorMsg);
00597   }
00598 
00599 }
00600 
00601 
00602 // Leaving in for now but making it return an empty buffer:
00603 void StorageManager::consumerListWebPage(xgi::Input *in, xgi::Output *out)
00604   throw (xgi::exception::Exception)
00605 {
00606   out->getHTTPResponseHeader().addHeader( "Content-Type",
00607                                           "application/octet-stream" );
00608   out->getHTTPResponseHeader().addHeader( "Content-Transfer-Encoding",
00609                                           "binary" );
00610   char buff;
00611   out->write( &buff, 0 );
00612 }
00613 
00614 
00616 // State Machine call back functions //
00618 
00619 xoap::MessageReference StorageManager::handleFSMSoapMessage( xoap::MessageReference msg )
00620   throw( xoap::exception::Exception )
00621 {
00622   std::string errorMsg;
00623   xoap::MessageReference returnMsg;
00624 
00625   try {
00626     errorMsg = "Failed to extract FSM event and parameters from SOAP message: ";
00627     std::string command = soaputils::extractParameters(msg, this);
00628 
00629     errorMsg = "Failed to put a '" + command + "' state machine event into command queue: ";
00630     if (command == "Configure")
00631     {
00632       sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::Configure() ) );
00633     }
00634     else if (command == "Enable")
00635     {
00636       if (sharedResources_->configuration_->streamConfigurationHasChanged())
00637       {
00638         sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::Reconfigure() ) );
00639       }
00640       sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::Enable() ) );
00641     }
00642     else if (command == "Stop")
00643     {
00644       sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::Stop() ) );
00645     }
00646     else if (command == "Halt")
00647     {
00648       sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::Halt() ) );
00649     }
00650     else if (command == "EmergencyStop")
00651     {
00652       sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::EmergencyStop() ) );
00653     }
00654     else
00655     {
00656       XCEPT_RAISE(stor::exception::StateMachine,
00657         "Received an unknown state machine event '" + command + "'.");
00658     }
00659 
00660     errorMsg = "Failed to create FSM SOAP reply message: ";
00661     returnMsg = soaputils::createFsmSoapResponseMsg(command,
00662       sharedResources_->statisticsReporter_->
00663       getStateMachineMonitorCollection().externallyVisibleState());
00664   }
00665   catch (cms::Exception& e) {
00666     errorMsg += e.explainSelf();
00667     XCEPT_DECLARE(xoap::exception::Exception,
00668       sentinelException, errorMsg);
00669     sharedResources_->alarmHandler_->moveToFailedState( sentinelException );
00670     throw sentinelException;
00671   }
00672   catch (xcept::Exception &e) {
00673     XCEPT_DECLARE_NESTED(xoap::exception::Exception,
00674       sentinelException, errorMsg, e);
00675     sharedResources_->alarmHandler_->moveToFailedState( sentinelException );
00676     throw sentinelException;
00677   }
00678   catch (std::exception& e) {
00679     errorMsg += e.what();
00680     XCEPT_DECLARE(xoap::exception::Exception,
00681       sentinelException, errorMsg);
00682     sharedResources_->alarmHandler_->moveToFailedState( sentinelException );
00683     throw sentinelException;
00684   }
00685   catch (...) {
00686     errorMsg += "Unknown exception";
00687     XCEPT_DECLARE(xoap::exception::Exception,
00688       sentinelException, errorMsg);
00689     sharedResources_->alarmHandler_->moveToFailedState( sentinelException );
00690     throw sentinelException;
00691   }
00692 
00693   return returnMsg;
00694 }
00695 
00696 
00700 
00701 void
00702 StorageManager::processConsumerRegistrationRequest( xgi::Input* in, xgi::Output* out )
00703   throw( xgi::exception::Exception )
00704 {
00705   consumerUtils_->processConsumerRegistrationRequest(in,out);
00706 }
00707 
00708 
00709 void
00710 StorageManager::processConsumerHeaderRequest( xgi::Input* in, xgi::Output* out )
00711   throw( xgi::exception::Exception )
00712 {
00713   consumerUtils_->processConsumerHeaderRequest(in,out);
00714 }
00715 
00716 
00717 void
00718 StorageManager::processConsumerEventRequest( xgi::Input* in, xgi::Output* out )
00719   throw( xgi::exception::Exception )
00720 {
00721   consumerUtils_->processConsumerEventRequest(in,out);
00722 }
00723 
00724 
00725 void
00726 StorageManager::processDQMConsumerRegistrationRequest( xgi::Input* in, xgi::Output* out )
00727   throw( xgi::exception::Exception )
00728 {
00729   consumerUtils_->processDQMConsumerRegistrationRequest(in,out);
00730 }
00731 
00732 
00733 void
00734 StorageManager::processDQMConsumerEventRequest( xgi::Input* in, xgi::Output* out )
00735   throw( xgi::exception::Exception )
00736 {
00737   consumerUtils_->processDQMConsumerEventRequest(in,out);
00738 }
00739 
00740 namespace stor {
00742   // Specialization for ConsumerUtils //
00744   template<>
00745   void
00746   ConsumerUtils<Configuration,EventQueueCollection>::
00747   writeConsumerEvent(xgi::Output* out, const I2OChain& evt) const
00748   {
00749     writeHTTPHeaders( out );
00750     
00751     #ifdef STOR_DEBUG_CORRUPTED_EVENT_HEADER
00752     double r = rand()/static_cast<double>(RAND_MAX);
00753     if (r < 0.1)
00754     {
00755       std::cout << "Simulating corrupted event header" << std::endl;
00756       EventHeader* h = (EventHeader*)evt.dataLocation(0);
00757       h->protocolVersion_ = 1;
00758     }
00759     #endif // STOR_DEBUG_CORRUPTED_EVENT_HEADER
00760     
00761     const unsigned int nfrags = evt.fragmentCount();
00762     for ( unsigned int i = 0; i < nfrags; ++i )
00763     {
00764       const unsigned long len = evt.dataSize( i );
00765       unsigned char* location = evt.dataLocation( i );
00766       out->write( (char*)location, len );
00767     } 
00768   }
00769 }
00770 
00771 
00773 // *** Provides factory method for the instantiation of SM applications //
00775 // This macro is depreciated:
00776 XDAQ_INSTANTIATE(StorageManager)
00777 
00778 // One should use the XDAQ_INSTANTIATOR() in the header file
00779 // and this one here. But this breaks the backward compatibility,
00780 // as all xml configuration files would have to be changed to use
00781 // 'stor::StorageManager' instead of 'StorageManager'.
00782 // XDAQ_INSTANTIATOR_IMPL(stor::StorageManager)
00783 
00784 
00785 
00786 
00787 
00788 
00789 
00790