Go to the documentation of this file.00001
00003
00004 #include "EventFilter/StorageManager/interface/AlarmHandler.h"
00005 #include "EventFilter/StorageManager/interface/EventDistributor.h"
00006 #include "EventFilter/StorageManager/interface/DiscardManager.h"
00007 #include "EventFilter/StorageManager/interface/FragmentStore.h"
00008 #include "EventFilter/StorageManager/interface/I2OChain.h"
00009 #include "EventFilter/StorageManager/interface/Notifier.h"
00010 #include "EventFilter/StorageManager/interface/SharedResources.h"
00011 #include "EventFilter/StorageManager/interface/StateMachine.h"
00012 #include "EventFilter/StorageManager/interface/StatisticsReporter.h"
00013 #include "EventFilter/StorageManager/interface/TransitionRecord.h"
00014
00015 #include <iostream>
00016 #include <sstream>
00017
00018 #include "xcept/tools.h"
00019
00020 using namespace std;
00021 using namespace stor;
00022
00023 Processing::Processing( my_context c ): my_base(c)
00024 {
00025 safeEntryAction();
00026 }
00027
00028 void Processing::do_entryActionWork()
00029 {
00030 TransitionRecord tr( stateName(), true );
00031 outermost_context().updateHistory( tr );
00032 outermost_context().setExternallyVisibleState( "Enabled" );
00033 outermost_context().getNotifier()->reportNewState( "Enabled" );
00034 }
00035
00036 Processing::~Processing()
00037 {
00038 safeExitAction();
00039 }
00040
00041 void Processing::do_exitActionWork()
00042 {
00043 TransitionRecord tr( stateName(), false );
00044 outermost_context().updateHistory( tr );
00045 }
00046
00047 string Processing::do_stateName() const
00048 {
00049 return std::string( "Processing" );
00050 }
00051
00052 void Processing::do_moveToFailedState( xcept::Exception& exception ) const
00053 {
00054 outermost_context().getSharedResources()->alarmHandler_->moveToFailedState( exception );
00055 }
00056
00057 void Processing::logEndRunRequest( const EndRun& request )
00058 {
00059 outermost_context().unconsumed_event( request );
00060 }
00061
00062 void
00063 Processing::do_processI2OFragment( I2OChain& frag ) const
00064 {
00065 static unsigned int noFragmentCount = 0;
00066
00067 FragmentStore* fragmentStore = outermost_context().getFragmentStore();
00068 bool completed = fragmentStore->addFragment(frag);
00069 if ( completed )
00070 {
00071
00072
00073
00074 try
00075 {
00076 uint32_t runNumber = outermost_context().getSharedResources()->configuration_->getRunNumber();
00077 frag.assertRunNumber(runNumber);
00078 }
00079 catch(stor::exception::RunNumberMismatch &e)
00080 {
00081
00082 outermost_context().getSharedResources()->
00083 alarmHandler_->notifySentinel(AlarmHandler::ERROR, e);
00084 }
00085 outermost_context().getEventDistributor()->addEventToRelevantQueues(frag);
00086 outermost_context().getSharedResources()->discardManager_->sendDiscardMessage(frag);
00087 }
00088 else
00089 {
00090
00091
00092 ++noFragmentCount;
00093 if ( noFragmentCount >= 100 )
00094 {
00095 noFragmentCount = 0;
00096 this->noFragmentToProcess();
00097 }
00098 }
00099
00100
00101
00102
00103
00104
00105
00106
00107 ThroughputMonitorCollection& tmc =
00108 outermost_context().getSharedResources()->statisticsReporter_->
00109 getThroughputMonitorCollection();
00110 tmc.setFragmentStoreSize(fragmentStore->size());
00111 tmc.setFragmentStoreMemoryUsed(fragmentStore->memoryUsed());
00112 }
00113
00114 void
00115 Processing::do_noFragmentToProcess() const
00116 {
00117 I2OChain staleEvent;
00118
00119 WorkerThreadParams workerParams =
00120 outermost_context().getSharedResources()->configuration_->getWorkerThreadParams();
00121 bool gotStaleEvent =
00122 outermost_context().getFragmentStore()->
00123 getStaleEvent(staleEvent, workerParams.staleFragmentTimeOut_);
00124
00125 if ( gotStaleEvent )
00126 {
00127 outermost_context().getSharedResources()->discardManager_->sendDiscardMessage(staleEvent);
00128 outermost_context().getEventDistributor()->addEventToRelevantQueues(staleEvent);
00129 }
00130 outermost_context().getEventDistributor()->checkForStaleConsumers();
00131 }
00132
00133