Go to the documentation of this file.00001
00003
00004 #include "EventFilter/StorageManager/interface/EventDistributor.h"
00005 #include "EventFilter/StorageManager/interface/DiscardManager.h"
00006 #include "EventFilter/StorageManager/interface/FragmentStore.h"
00007 #include "EventFilter/StorageManager/interface/I2OChain.h"
00008 #include "EventFilter/StorageManager/interface/Notifier.h"
00009 #include "EventFilter/StorageManager/interface/SharedResources.h"
00010 #include "EventFilter/StorageManager/interface/StateMachine.h"
00011 #include "EventFilter/StorageManager/interface/StatisticsReporter.h"
00012 #include "EventFilter/StorageManager/interface/TransitionRecord.h"
00013
00014 #include <iostream>
00015 #include <sstream>
00016
00017 #include "xcept/tools.h"
00018
00019 using namespace std;
00020 using namespace stor;
00021
00022 Processing::Processing( my_context c ): my_base(c)
00023 {
00024 safeEntryAction();
00025 }
00026
00027 void Processing::do_entryActionWork()
00028 {
00029 TransitionRecord tr( stateName(), true );
00030 outermost_context().updateHistory( tr );
00031 outermost_context().setExternallyVisibleState( "Enabled" );
00032 outermost_context().getNotifier()->reportNewState( "Enabled" );
00033 }
00034
00035 Processing::~Processing()
00036 {
00037 safeExitAction();
00038 }
00039
00040 void Processing::do_exitActionWork()
00041 {
00042 TransitionRecord tr( stateName(), false );
00043 outermost_context().updateHistory( tr );
00044 }
00045
00046 string Processing::do_stateName() const
00047 {
00048 return std::string( "Processing" );
00049 }
00050
00051 void Processing::do_moveToFailedState( xcept::Exception& exception ) const
00052 {
00053 outermost_context().getSharedResources()->moveToFailedState( exception );
00054 }
00055
00056 void Processing::logEndRunRequest( const EndRun& request )
00057 {
00058 outermost_context().unconsumed_event( request );
00059 }
00060
00061 void
00062 Processing::do_processI2OFragment( I2OChain& frag ) const
00063 {
00064 static unsigned int noFragmentCount = 0;
00065
00066 FragmentStore* fragmentStore = outermost_context().getFragmentStore();
00067 bool completed = fragmentStore->addFragment(frag);
00068 if ( completed )
00069 {
00070
00071
00072
00073 try
00074 {
00075 uint32_t runNumber = outermost_context().getSharedResources()->configuration_->getRunNumber();
00076 frag.assertRunNumber(runNumber);
00077 }
00078 catch(stor::exception::RunNumberMismatch &e)
00079 {
00080
00081 outermost_context().getSharedResources()->statisticsReporter_->
00082 alarmHandler()->notifySentinel(AlarmHandler::ERROR, e);
00083 }
00084 outermost_context().getEventDistributor()->addEventToRelevantQueues(frag);
00085 outermost_context().getSharedResources()->discardManager_->sendDiscardMessage(frag);
00086 }
00087 else
00088 {
00089
00090
00091 ++noFragmentCount;
00092 if ( noFragmentCount >= 100 )
00093 {
00094 noFragmentCount = 0;
00095 this->noFragmentToProcess();
00096 }
00097 }
00098
00099
00100
00101
00102
00103
00104
00105
00106 ThroughputMonitorCollection& tmc =
00107 outermost_context().getSharedResources()->statisticsReporter_->
00108 getThroughputMonitorCollection();
00109 tmc.setFragmentStoreSize(fragmentStore->size());
00110 tmc.setFragmentStoreMemoryUsed(fragmentStore->memoryUsed());
00111 }
00112
00113 void
00114 Processing::do_noFragmentToProcess() const
00115 {
00116 I2OChain staleEvent;
00117
00118 WorkerThreadParams workerParams =
00119 outermost_context().getSharedResources()->configuration_->getWorkerThreadParams();
00120 bool gotStaleEvent =
00121 outermost_context().getFragmentStore()->
00122 getStaleEvent(staleEvent, workerParams.staleFragmentTimeOut_);
00123
00124 if ( gotStaleEvent )
00125 {
00126 outermost_context().getSharedResources()->discardManager_->sendDiscardMessage(staleEvent);
00127 outermost_context().getEventDistributor()->addEventToRelevantQueues(staleEvent);
00128 }
00129 outermost_context().getEventDistributor()->checkForStaleConsumers();
00130 }
00131
00132