CMS 3D CMS Logo

/data/doxygen/doxygen-1.7.3/gen/CMSSW_4_2_8/src/EventFilter/StorageManager/src/DrainingQueues.cc

Go to the documentation of this file.
00001 // $Id: DrainingQueues.cc,v 1.13 2011/03/07 15:31:32 mommsen Exp $
00003 
00004 #include "EventFilter/StorageManager/interface/CommandQueue.h"
00005 #include "EventFilter/StorageManager/interface/DiscardManager.h"
00006 #include "EventFilter/StorageManager/interface/DiskWriter.h"
00007 #include "EventFilter/StorageManager/interface/DiskWriterResources.h"
00008 #include "EventFilter/StorageManager/interface/EventDistributor.h"
00009 #include "EventFilter/StorageManager/interface/FragmentStore.h"
00010 #include "EventFilter/StorageManager/interface/I2OChain.h"
00011 #include "EventFilter/StorageManager/interface/Notifier.h"
00012 #include "EventFilter/StorageManager/interface/SharedResources.h"
00013 #include "EventFilter/StorageManager/interface/StateMachine.h"
00014 #include "EventFilter/StorageManager/interface/TransitionRecord.h"
00015 
00016 #include <iostream>
00017 #include <unistd.h>
00018 
00019 using namespace std;
00020 using namespace stor;
00021 
00022 DrainingQueues::DrainingQueues( my_context c ): my_base(c)
00023 {
00024   safeEntryAction();
00025 }
00026 
00027 void DrainingQueues::do_entryActionWork()
00028 {
00029   TransitionRecord tr( stateName(), true );
00030   outermost_context().updateHistory( tr );
00031 }
00032 
00033 DrainingQueues::~DrainingQueues()
00034 {
00035   safeExitAction();
00036 }
00037 
00038 void DrainingQueues::do_exitActionWork()
00039 {
00040   TransitionRecord tr( stateName(), false );
00041   outermost_context().updateHistory( tr );
00042 }
00043 
00044 string DrainingQueues::do_stateName() const
00045 {
00046   return std::string( "DrainingQueues" );
00047 }
00048 
00049 void DrainingQueues::do_moveToFailedState( xcept::Exception& exception ) const
00050 {
00051   outermost_context().getSharedResources()->moveToFailedState( exception );
00052 }
00053 
00054 void DrainingQueues::logEndRunRequest( const EndRun& request )
00055 {
00056   outermost_context().unconsumed_event( request );
00057 }
00058 
00059 void
00060 DrainingQueues::do_noFragmentToProcess() const
00061 {
00062   if ( allQueuesAndWorkersAreEmpty() )
00063   {
00064     SharedResourcesPtr sharedResources =
00065       outermost_context().getSharedResources();
00066     EventPtr_t stMachEvent( new QueuesEmpty() );
00067     sharedResources->commandQueue_->enqWait( stMachEvent );
00068   }
00069 }
00070 
00071 bool
00072 DrainingQueues::allQueuesAndWorkersAreEmpty() const
00073 {
00074   SharedResourcesPtr sharedResources = 
00075     outermost_context().getSharedResources();
00076 
00077   // the order is important here - upstream entities first,
00078   // followed by more downstream entities
00079 
00080   EventDistributor *ed = outermost_context().getEventDistributor();
00081   if ( ed->full() ) return false;
00082 
00083   processStaleFragments();
00084   FragmentStore *fs = outermost_context().getFragmentStore();
00085   if ( ! fs->empty() ) return false;
00086 
00087   if ( ! sharedResources->streamQueue_->empty() ) return false;
00088 
00089   if ( sharedResources->diskWriterResources_->isBusy() ) return false;
00090   
00091   //if ( ! sharedResources->dqmEventQueue_->empty() ) return false;
00092   // Do not wait for dqmEventQueue to drain, just clear it
00093   sharedResources->dqmEventQueue_->clear();
00094 
00095   return true;
00096 }
00097 
00098 void
00099 DrainingQueues::processStaleFragments() const
00100 {
00101   I2OChain staleEvent;
00102   bool gotStaleEvent = true;  
00103   int loopCounter = 0;
00104 
00105   EventDistributor *ed = outermost_context().getEventDistributor();
00106 
00107   while ( gotStaleEvent && !ed->full() && loopCounter++ < 10 )
00108   {
00109     gotStaleEvent = 
00110       outermost_context().getFragmentStore()->getStaleEvent(staleEvent, boost::posix_time::seconds(0));
00111     if ( gotStaleEvent )
00112     {
00113       outermost_context().getSharedResources()->discardManager_->sendDiscardMessage(staleEvent);
00114       ed->addEventToRelevantQueues(staleEvent);
00115     }
00116   }
00117 }
00118 
00119