Go to the documentation of this file.00001
00003
00004 #include "EventFilter/StorageManager/interface/AlarmHandler.h"
00005 #include "EventFilter/StorageManager/interface/CommandQueue.h"
00006 #include "EventFilter/StorageManager/interface/DiscardManager.h"
00007 #include "EventFilter/StorageManager/interface/DiskWriter.h"
00008 #include "EventFilter/StorageManager/interface/DiskWriterResources.h"
00009 #include "EventFilter/StorageManager/interface/EventDistributor.h"
00010 #include "EventFilter/StorageManager/interface/FragmentStore.h"
00011 #include "EventFilter/StorageManager/interface/I2OChain.h"
00012 #include "EventFilter/StorageManager/interface/Notifier.h"
00013 #include "EventFilter/StorageManager/interface/SharedResources.h"
00014 #include "EventFilter/StorageManager/interface/StateMachine.h"
00015 #include "EventFilter/StorageManager/interface/TransitionRecord.h"
00016
00017 #include <iostream>
00018 #include <unistd.h>
00019
00020 using namespace std;
00021 using namespace stor;
00022
00023 DrainingQueues::DrainingQueues( my_context c ): my_base(c)
00024 {
00025 safeEntryAction();
00026 }
00027
00028 void DrainingQueues::do_entryActionWork()
00029 {
00030 TransitionRecord tr( stateName(), true );
00031 outermost_context().updateHistory( tr );
00032 }
00033
00034 DrainingQueues::~DrainingQueues()
00035 {
00036 safeExitAction();
00037 }
00038
00039 void DrainingQueues::do_exitActionWork()
00040 {
00041 TransitionRecord tr( stateName(), false );
00042 outermost_context().updateHistory( tr );
00043 }
00044
00045 string DrainingQueues::do_stateName() const
00046 {
00047 return std::string( "DrainingQueues" );
00048 }
00049
00050 void DrainingQueues::do_moveToFailedState( xcept::Exception& exception ) const
00051 {
00052 outermost_context().getSharedResources()->alarmHandler_->moveToFailedState( exception );
00053 }
00054
00055 void DrainingQueues::logEndRunRequest( const EndRun& request )
00056 {
00057 outermost_context().unconsumed_event( request );
00058 }
00059
00060 void
00061 DrainingQueues::do_noFragmentToProcess() const
00062 {
00063 if ( allQueuesAndWorkersAreEmpty() )
00064 {
00065 SharedResourcesPtr sharedResources =
00066 outermost_context().getSharedResources();
00067 EventPtr_t stMachEvent( new QueuesEmpty() );
00068 sharedResources->commandQueue_->enqWait( stMachEvent );
00069 }
00070 }
00071
00072 bool
00073 DrainingQueues::allQueuesAndWorkersAreEmpty() const
00074 {
00075 SharedResourcesPtr sharedResources =
00076 outermost_context().getSharedResources();
00077
00078
00079
00080
00081 EventDistributor *ed = outermost_context().getEventDistributor();
00082 if ( ed->full() ) return false;
00083
00084 processStaleFragments();
00085 FragmentStore *fs = outermost_context().getFragmentStore();
00086 if ( ! fs->empty() ) return false;
00087
00088 if ( ! sharedResources->streamQueue_->empty() ) return false;
00089
00090 if ( sharedResources->diskWriterResources_->isBusy() ) return false;
00091
00092
00093
00094 sharedResources->dqmEventQueue_->clear();
00095
00096 return true;
00097 }
00098
00099 void
00100 DrainingQueues::processStaleFragments() const
00101 {
00102 I2OChain staleEvent;
00103 bool gotStaleEvent = true;
00104 int loopCounter = 0;
00105
00106 EventDistributor *ed = outermost_context().getEventDistributor();
00107
00108 while ( gotStaleEvent && !ed->full() && loopCounter++ < 10 )
00109 {
00110 gotStaleEvent =
00111 outermost_context().getFragmentStore()->getStaleEvent(staleEvent, boost::posix_time::seconds(0));
00112 if ( gotStaleEvent )
00113 {
00114 outermost_context().getSharedResources()->discardManager_->sendDiscardMessage(staleEvent);
00115 ed->addEventToRelevantQueues(staleEvent);
00116 }
00117 }
00118 }
00119
00120