CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_5_3_10/src/EventFilter/StorageManager/src/Processing.cc

Go to the documentation of this file.
00001 // $Id: Processing.cc,v 1.19 2011/11/08 10:48:41 mommsen Exp $
00003 
00004 #include "EventFilter/StorageManager/interface/AlarmHandler.h"
00005 #include "EventFilter/StorageManager/interface/EventDistributor.h"
00006 #include "EventFilter/StorageManager/interface/DiscardManager.h"
00007 #include "EventFilter/StorageManager/interface/FragmentStore.h"
00008 #include "EventFilter/StorageManager/interface/I2OChain.h"
00009 #include "EventFilter/StorageManager/interface/Notifier.h"
00010 #include "EventFilter/StorageManager/interface/SharedResources.h"
00011 #include "EventFilter/StorageManager/interface/StateMachine.h"
00012 #include "EventFilter/StorageManager/interface/StatisticsReporter.h"
00013 #include "EventFilter/StorageManager/interface/TransitionRecord.h"
00014 
00015 #include <iostream>
00016 #include <sstream>
00017 
00018 #include "xcept/tools.h"
00019 
00020 using namespace std;
00021 using namespace stor;
00022 
00023 Processing::Processing( my_context c ): my_base(c)
00024 {
00025   safeEntryAction();
00026 }
00027 
00028 void Processing::do_entryActionWork()
00029 {
00030   TransitionRecord tr( stateName(), true );
00031   outermost_context().updateHistory( tr );
00032   outermost_context().setExternallyVisibleState( "Enabled" );
00033   outermost_context().getNotifier()->reportNewState( "Enabled" );
00034 }
00035 
00036 Processing::~Processing()
00037 {
00038   safeExitAction();
00039 }
00040 
00041 void Processing::do_exitActionWork()
00042 {
00043   TransitionRecord tr( stateName(), false );
00044   outermost_context().updateHistory( tr );
00045 }
00046 
00047 string Processing::do_stateName() const
00048 {
00049   return std::string( "Processing" );
00050 }
00051 
00052 void Processing::do_moveToFailedState( xcept::Exception& exception ) const
00053 {
00054   outermost_context().getSharedResources()->alarmHandler_->moveToFailedState( exception );
00055 }
00056 
00057 void Processing::logEndRunRequest( const EndRun& request )
00058 {
00059   outermost_context().unconsumed_event( request );
00060 }
00061 
00062 void
00063 Processing::do_processI2OFragment( I2OChain& frag ) const
00064 {
00065   static unsigned int noFragmentCount = 0;
00066 
00067   FragmentStore* fragmentStore = outermost_context().getFragmentStore();
00068   bool completed = fragmentStore->addFragment(frag);
00069   if ( completed )
00070   {
00071     // The run number check has to be done before the event is added to the
00072     // queues, as for some event types, e.g. error events, the run number
00073     // match is enforced.
00074     try
00075     {
00076       uint32_t runNumber = outermost_context().getSharedResources()->configuration_->getRunNumber();
00077       frag.assertRunNumber(runNumber);
00078     }
00079     catch(stor::exception::RunNumberMismatch &e)
00080     {
00081       // Just raise an alarm, but continue to process the event
00082       outermost_context().getSharedResources()->
00083         alarmHandler_->notifySentinel(AlarmHandler::ERROR, e);
00084     }
00085     outermost_context().getEventDistributor()->addEventToRelevantQueues(frag);
00086     outermost_context().getSharedResources()->discardManager_->sendDiscardMessage(frag);
00087   }
00088   else
00089   {
00090     // Only do the check every 100th fragment
00091     // TODO: shall we make this number configurable?
00092     ++noFragmentCount;
00093     if ( noFragmentCount >= 100 )
00094     {
00095       noFragmentCount = 0;
00096       this->noFragmentToProcess();
00097     }
00098   }
00099 
00100   // 12-Aug-2009, KAB - I put the sampling of the fragment store size
00101   // *after* the code to add fragments to the store and move them from the
00102   // fragment store to the relevant queues (when needed) so that the baseline
00103   // number of events in the fragment store is zero.  For example, when
00104   // disk writing is slow or stopped, the stream queue fills up, and there
00105   // is backpressure within the SM, the true number of events in the fragment
00106   // store is zero, and putting the sampling here reflects that.
00107   ThroughputMonitorCollection& tmc = 
00108     outermost_context().getSharedResources()->statisticsReporter_->
00109     getThroughputMonitorCollection();
00110   tmc.setFragmentStoreSize(fragmentStore->size());
00111   tmc.setFragmentStoreMemoryUsed(fragmentStore->memoryUsed());
00112 }
00113 
00114 void
00115 Processing::do_noFragmentToProcess() const
00116 {
00117   I2OChain staleEvent;
00118 
00119   WorkerThreadParams workerParams =
00120     outermost_context().getSharedResources()->configuration_->getWorkerThreadParams();
00121   bool gotStaleEvent = 
00122     outermost_context().getFragmentStore()->
00123     getStaleEvent(staleEvent, workerParams.staleFragmentTimeOut_);
00124 
00125   if ( gotStaleEvent )
00126   {
00127     outermost_context().getSharedResources()->discardManager_->sendDiscardMessage(staleEvent);
00128     outermost_context().getEventDistributor()->addEventToRelevantQueues(staleEvent);
00129   }
00130   outermost_context().getEventDistributor()->checkForStaleConsumers();
00131 }
00132 
00133