CMS 3D CMS Logo

/data/doxygen/doxygen-1.7.3/gen/CMSSW_4_2_8/src/EventFilter/StorageManager/src/Processing.cc

Go to the documentation of this file.
00001 // $Id: Processing.cc,v 1.18 2011/03/07 15:31:32 mommsen Exp $
00003 
00004 #include "EventFilter/StorageManager/interface/EventDistributor.h"
00005 #include "EventFilter/StorageManager/interface/DiscardManager.h"
00006 #include "EventFilter/StorageManager/interface/FragmentStore.h"
00007 #include "EventFilter/StorageManager/interface/I2OChain.h"
00008 #include "EventFilter/StorageManager/interface/Notifier.h"
00009 #include "EventFilter/StorageManager/interface/SharedResources.h"
00010 #include "EventFilter/StorageManager/interface/StateMachine.h"
00011 #include "EventFilter/StorageManager/interface/StatisticsReporter.h"
00012 #include "EventFilter/StorageManager/interface/TransitionRecord.h"
00013 
00014 #include <iostream>
00015 #include <sstream>
00016 
00017 #include "xcept/tools.h"
00018 
00019 using namespace std;
00020 using namespace stor;
00021 
00022 Processing::Processing( my_context c ): my_base(c)
00023 {
00024   safeEntryAction();
00025 }
00026 
00027 void Processing::do_entryActionWork()
00028 {
00029   TransitionRecord tr( stateName(), true );
00030   outermost_context().updateHistory( tr );
00031   outermost_context().setExternallyVisibleState( "Enabled" );
00032   outermost_context().getNotifier()->reportNewState( "Enabled" );
00033 }
00034 
00035 Processing::~Processing()
00036 {
00037   safeExitAction();
00038 }
00039 
00040 void Processing::do_exitActionWork()
00041 {
00042   TransitionRecord tr( stateName(), false );
00043   outermost_context().updateHistory( tr );
00044 }
00045 
00046 string Processing::do_stateName() const
00047 {
00048   return std::string( "Processing" );
00049 }
00050 
00051 void Processing::do_moveToFailedState( xcept::Exception& exception ) const
00052 {
00053   outermost_context().getSharedResources()->moveToFailedState( exception );
00054 }
00055 
00056 void Processing::logEndRunRequest( const EndRun& request )
00057 {
00058   outermost_context().unconsumed_event( request );
00059 }
00060 
00061 void
00062 Processing::do_processI2OFragment( I2OChain& frag ) const
00063 {
00064   static unsigned int noFragmentCount = 0;
00065 
00066   FragmentStore* fragmentStore = outermost_context().getFragmentStore();
00067   bool completed = fragmentStore->addFragment(frag);
00068   if ( completed )
00069   {
00070     // The run number check has to be done before the event is added to the
00071     // queues, as for some event types, e.g. error events, the run number
00072     // match is enforced.
00073     try
00074     {
00075       uint32_t runNumber = outermost_context().getSharedResources()->configuration_->getRunNumber();
00076       frag.assertRunNumber(runNumber);
00077     }
00078     catch(stor::exception::RunNumberMismatch &e)
00079     {
00080       // Just raise an alarm, but continue to process the event
00081       outermost_context().getSharedResources()->statisticsReporter_->
00082         alarmHandler()->notifySentinel(AlarmHandler::ERROR, e);
00083     }
00084     outermost_context().getEventDistributor()->addEventToRelevantQueues(frag);
00085     outermost_context().getSharedResources()->discardManager_->sendDiscardMessage(frag);
00086   }
00087   else
00088   {
00089     // Only do the check every 100th fragment
00090     // TODO: shall we make this number configurable?
00091     ++noFragmentCount;
00092     if ( noFragmentCount >= 100 )
00093     {
00094       noFragmentCount = 0;
00095       this->noFragmentToProcess();
00096     }
00097   }
00098 
00099   // 12-Aug-2009, KAB - I put the sampling of the fragment store size
00100   // *after* the code to add fragments to the store and move them from the
00101   // fragment store to the relevant queues (when needed) so that the baseline
00102   // number of events in the fragment store is zero.  For example, when
00103   // disk writing is slow or stopped, the stream queue fills up, and there
00104   // is backpressure within the SM, the true number of events in the fragment
00105   // store is zero, and putting the sampling here reflects that.
00106   ThroughputMonitorCollection& tmc = 
00107     outermost_context().getSharedResources()->statisticsReporter_->
00108     getThroughputMonitorCollection();
00109   tmc.setFragmentStoreSize(fragmentStore->size());
00110   tmc.setFragmentStoreMemoryUsed(fragmentStore->memoryUsed());
00111 }
00112 
00113 void
00114 Processing::do_noFragmentToProcess() const
00115 {
00116   I2OChain staleEvent;
00117 
00118   WorkerThreadParams workerParams =
00119     outermost_context().getSharedResources()->configuration_->getWorkerThreadParams();
00120   bool gotStaleEvent = 
00121     outermost_context().getFragmentStore()->
00122     getStaleEvent(staleEvent, workerParams.staleFragmentTimeOut_);
00123 
00124   if ( gotStaleEvent )
00125   {
00126     outermost_context().getSharedResources()->discardManager_->sendDiscardMessage(staleEvent);
00127     outermost_context().getEventDistributor()->addEventToRelevantQueues(staleEvent);
00128   }
00129   outermost_context().getEventDistributor()->checkForStaleConsumers();
00130 }
00131 
00132