Go to the documentation of this file.00001
00003
00004 #include "EventFilter/StorageManager/interface/CommandQueue.h"
00005 #include "EventFilter/StorageManager/interface/DiscardManager.h"
00006 #include "EventFilter/StorageManager/interface/DiskWriter.h"
00007 #include "EventFilter/StorageManager/interface/DiskWriterResources.h"
00008 #include "EventFilter/StorageManager/interface/EventDistributor.h"
00009 #include "EventFilter/StorageManager/interface/FragmentStore.h"
00010 #include "EventFilter/StorageManager/interface/I2OChain.h"
00011 #include "EventFilter/StorageManager/interface/Notifier.h"
00012 #include "EventFilter/StorageManager/interface/SharedResources.h"
00013 #include "EventFilter/StorageManager/interface/StateMachine.h"
00014 #include "EventFilter/StorageManager/interface/TransitionRecord.h"
00015
00016 #include <iostream>
00017 #include <unistd.h>
00018
00019 using namespace std;
00020 using namespace stor;
00021
00022 DrainingQueues::DrainingQueues( my_context c ): my_base(c)
00023 {
00024 safeEntryAction();
00025 }
00026
00027 void DrainingQueues::do_entryActionWork()
00028 {
00029 TransitionRecord tr( stateName(), true );
00030 outermost_context().updateHistory( tr );
00031 }
00032
00033 DrainingQueues::~DrainingQueues()
00034 {
00035 safeExitAction();
00036 }
00037
00038 void DrainingQueues::do_exitActionWork()
00039 {
00040 TransitionRecord tr( stateName(), false );
00041 outermost_context().updateHistory( tr );
00042 }
00043
00044 string DrainingQueues::do_stateName() const
00045 {
00046 return std::string( "DrainingQueues" );
00047 }
00048
00049 void DrainingQueues::do_moveToFailedState( xcept::Exception& exception ) const
00050 {
00051 outermost_context().getSharedResources()->moveToFailedState( exception );
00052 }
00053
00054 void DrainingQueues::logEndRunRequest( const EndRun& request )
00055 {
00056 outermost_context().unconsumed_event( request );
00057 }
00058
00059 void
00060 DrainingQueues::do_noFragmentToProcess() const
00061 {
00062 if ( allQueuesAndWorkersAreEmpty() )
00063 {
00064 SharedResourcesPtr sharedResources =
00065 outermost_context().getSharedResources();
00066 EventPtr_t stMachEvent( new QueuesEmpty() );
00067 sharedResources->commandQueue_->enqWait( stMachEvent );
00068 }
00069 }
00070
00071 bool
00072 DrainingQueues::allQueuesAndWorkersAreEmpty() const
00073 {
00074 SharedResourcesPtr sharedResources =
00075 outermost_context().getSharedResources();
00076
00077
00078
00079
00080 EventDistributor *ed = outermost_context().getEventDistributor();
00081 if ( ed->full() ) return false;
00082
00083 processStaleFragments();
00084 FragmentStore *fs = outermost_context().getFragmentStore();
00085 if ( ! fs->empty() ) return false;
00086
00087 if ( ! sharedResources->streamQueue_->empty() ) return false;
00088
00089 if ( sharedResources->diskWriterResources_->isBusy() ) return false;
00090
00091
00092
00093 sharedResources->dqmEventQueue_->clear();
00094
00095 return true;
00096 }
00097
00098 void
00099 DrainingQueues::processStaleFragments() const
00100 {
00101 I2OChain staleEvent;
00102 bool gotStaleEvent = true;
00103 int loopCounter = 0;
00104
00105 EventDistributor *ed = outermost_context().getEventDistributor();
00106
00107 while ( gotStaleEvent && !ed->full() && loopCounter++ < 10 )
00108 {
00109 gotStaleEvent =
00110 outermost_context().getFragmentStore()->getStaleEvent(staleEvent, boost::posix_time::seconds(0));
00111 if ( gotStaleEvent )
00112 {
00113 outermost_context().getSharedResources()->discardManager_->sendDiscardMessage(staleEvent);
00114 ed->addEventToRelevantQueues(staleEvent);
00115 }
00116 }
00117 }
00118
00119