CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_5_3_1/src/EventFilter/StorageManager/src/DrainingQueues.cc

Go to the documentation of this file.
00001 // $Id: DrainingQueues.cc,v 1.14 2011/11/08 10:48:40 mommsen Exp $
00003 
00004 #include "EventFilter/StorageManager/interface/AlarmHandler.h"
00005 #include "EventFilter/StorageManager/interface/CommandQueue.h"
00006 #include "EventFilter/StorageManager/interface/DiscardManager.h"
00007 #include "EventFilter/StorageManager/interface/DiskWriter.h"
00008 #include "EventFilter/StorageManager/interface/DiskWriterResources.h"
00009 #include "EventFilter/StorageManager/interface/EventDistributor.h"
00010 #include "EventFilter/StorageManager/interface/FragmentStore.h"
00011 #include "EventFilter/StorageManager/interface/I2OChain.h"
00012 #include "EventFilter/StorageManager/interface/Notifier.h"
00013 #include "EventFilter/StorageManager/interface/SharedResources.h"
00014 #include "EventFilter/StorageManager/interface/StateMachine.h"
00015 #include "EventFilter/StorageManager/interface/TransitionRecord.h"
00016 
00017 #include <iostream>
00018 #include <unistd.h>
00019 
00020 using namespace std;
00021 using namespace stor;
00022 
00023 DrainingQueues::DrainingQueues( my_context c ): my_base(c)
00024 {
00025   safeEntryAction();
00026 }
00027 
00028 void DrainingQueues::do_entryActionWork()
00029 {
00030   TransitionRecord tr( stateName(), true );
00031   outermost_context().updateHistory( tr );
00032 }
00033 
00034 DrainingQueues::~DrainingQueues()
00035 {
00036   safeExitAction();
00037 }
00038 
00039 void DrainingQueues::do_exitActionWork()
00040 {
00041   TransitionRecord tr( stateName(), false );
00042   outermost_context().updateHistory( tr );
00043 }
00044 
00045 string DrainingQueues::do_stateName() const
00046 {
00047   return std::string( "DrainingQueues" );
00048 }
00049 
00050 void DrainingQueues::do_moveToFailedState( xcept::Exception& exception ) const
00051 {
00052   outermost_context().getSharedResources()->alarmHandler_->moveToFailedState( exception );
00053 }
00054 
00055 void DrainingQueues::logEndRunRequest( const EndRun& request )
00056 {
00057   outermost_context().unconsumed_event( request );
00058 }
00059 
00060 void
00061 DrainingQueues::do_noFragmentToProcess() const
00062 {
00063   if ( allQueuesAndWorkersAreEmpty() )
00064   {
00065     SharedResourcesPtr sharedResources =
00066       outermost_context().getSharedResources();
00067     EventPtr_t stMachEvent( new QueuesEmpty() );
00068     sharedResources->commandQueue_->enqWait( stMachEvent );
00069   }
00070 }
00071 
00072 bool
00073 DrainingQueues::allQueuesAndWorkersAreEmpty() const
00074 {
00075   SharedResourcesPtr sharedResources = 
00076     outermost_context().getSharedResources();
00077 
00078   // the order is important here - upstream entities first,
00079   // followed by more downstream entities
00080 
00081   EventDistributor *ed = outermost_context().getEventDistributor();
00082   if ( ed->full() ) return false;
00083 
00084   processStaleFragments();
00085   FragmentStore *fs = outermost_context().getFragmentStore();
00086   if ( ! fs->empty() ) return false;
00087 
00088   if ( ! sharedResources->streamQueue_->empty() ) return false;
00089 
00090   if ( sharedResources->diskWriterResources_->isBusy() ) return false;
00091   
00092   //if ( ! sharedResources->dqmEventQueue_->empty() ) return false;
00093   // Do not wait for dqmEventQueue to drain, just clear it
00094   sharedResources->dqmEventQueue_->clear();
00095 
00096   return true;
00097 }
00098 
00099 void
00100 DrainingQueues::processStaleFragments() const
00101 {
00102   I2OChain staleEvent;
00103   bool gotStaleEvent = true;  
00104   int loopCounter = 0;
00105 
00106   EventDistributor *ed = outermost_context().getEventDistributor();
00107 
00108   while ( gotStaleEvent && !ed->full() && loopCounter++ < 10 )
00109   {
00110     gotStaleEvent = 
00111       outermost_context().getFragmentStore()->getStaleEvent(staleEvent, boost::posix_time::seconds(0));
00112     if ( gotStaleEvent )
00113     {
00114       outermost_context().getSharedResources()->discardManager_->sendDiscardMessage(staleEvent);
00115       ed->addEventToRelevantQueues(staleEvent);
00116     }
00117   }
00118 }
00119 
00120