CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_4_1_8_patch9/src/EventFilter/StorageManager/src/EventDistributor.cc

Go to the documentation of this file.
00001 // $Id: EventDistributor.cc,v 1.24 2011/03/07 15:31:32 mommsen Exp $
00003 
00004 #include "EventFilter/StorageManager/interface/DataSenderMonitorCollection.h"
00005 #include "EventFilter/StorageManager/interface/DQMEventConsumerRegistrationInfo.h"
00006 #include "EventFilter/StorageManager/interface/DQMEventSelector.h"
00007 #include "EventFilter/StorageManager/interface/ErrorStreamConfigurationInfo.h"
00008 #include "EventFilter/StorageManager/interface/ErrorStreamSelector.h"
00009 #include "EventFilter/StorageManager/interface/EventConsumerRegistrationInfo.h"
00010 #include "EventFilter/StorageManager/interface/EventConsumerSelector.h"
00011 #include "EventFilter/StorageManager/interface/EventDistributor.h"
00012 #include "EventFilter/StorageManager/interface/EventStreamConfigurationInfo.h"
00013 #include "EventFilter/StorageManager/interface/EventStreamSelector.h"
00014 #include "EventFilter/StorageManager/interface/I2OChain.h"
00015 #include "EventFilter/StorageManager/interface/InitMsgCollection.h"
00016 #include "EventFilter/StorageManager/interface/QueueID.h"
00017 #include "EventFilter/StorageManager/interface/RegistrationCollection.h"
00018 #include "EventFilter/StorageManager/interface/RunMonitorCollection.h"
00019 #include "EventFilter/StorageManager/interface/StatisticsReporter.h"
00020 #include "EventFilter/StorageManager/interface/Exception.h"
00021 
00022 #include "EventFilter/Utilities/interface/i2oEvfMsgs.h"
00023 
00024 using namespace stor;
00025 
00026 
00027 EventDistributor::EventDistributor(SharedResourcesPtr sr):
00028   sharedResources_(sr)
00029 {}
00030 
00031 
00032 EventDistributor::~EventDistributor()
00033 {
00034   clearStreams();
00035   clearConsumers();
00036 }
00037 
00038 void EventDistributor::addEventToRelevantQueues( I2OChain& ioc )
00039 {
00040   // special handling for faulty or incomplete events
00041   if ( ioc.faulty() || !ioc.complete() )
00042   {
00043     std::ostringstream msg;
00044     msg << "Faulty or incomplete I2OChain for event " 
00045       << ioc.fragmentKey().event_
00046       << ": 0x" << std::hex << ioc.faultyBits()
00047       << " received from " << ioc.hltURL()
00048       << " (rbBufferId " << ioc.rbBufferId() << ").";
00049     XCEPT_DECLARE( stor::exception::IncompleteEventMessage,
00050       xcept, msg.str());
00051     sharedResources_->statisticsReporter_->alarmHandler()->
00052       notifySentinel(AlarmHandler::ERROR, xcept);
00053 
00054     DataSenderMonitorCollection& dataSenderMonColl =
00055       sharedResources_->statisticsReporter_->getDataSenderMonitorCollection();
00056     dataSenderMonColl.addFaultyEventSample(ioc);
00057 
00058     if ( !( sharedResources_->configuration_->getDiskWritingParams().faultyEventsStream_.empty() ) &&
00059       ( ioc.i2oMessageCode() == I2O_SM_DATA || ioc.i2oMessageCode() == I2O_SM_ERROR) )
00060       ioc.tagForStream(0); // special stream for faulty events
00061   }
00062   else
00063   {
00064     tagCompleteEventForQueues( ioc );
00065   }
00066 
00067   // Check if event belongs here at all:
00068   bool unexpected = true;
00069 
00070   if( ioc.isTaggedForAnyStream() )
00071   {
00072     unexpected = false;
00073     sharedResources_->streamQueue_->enqWait( ioc );
00074   }
00075   
00076   if( ioc.isTaggedForAnyEventConsumer() )
00077   {
00078     unexpected = false;
00079     sharedResources_->eventQueueCollection_->addEvent( ioc );
00080   }
00081 
00082   if( unexpected && ioc.messageCode() == Header::EVENT )
00083   {
00084     RunMonitorCollection& runMonColl =
00085       sharedResources_->statisticsReporter_->getRunMonitorCollection();
00086     runMonColl.addUnwantedEvent(ioc);
00087   }
00088 }
00089 
00090 void EventDistributor::tagCompleteEventForQueues( I2OChain& ioc )
00091 {
00092   switch( ioc.messageCode() )
00093   {
00094     
00095     case Header::INIT:
00096     {
00097       std::vector<unsigned char> b;
00098       ioc.copyFragmentsIntoBuffer(b);
00099       InitMsgView imv( &b[0] );
00100       if( sharedResources_->initMsgCollection_->addIfUnique( imv ) )
00101       {
00102         try
00103         {
00104           for_each(eventStreamSelectors_.begin(),eventStreamSelectors_.end(),
00105             boost::bind(&EventStreamSelector::initialize, _1, imv));
00106 
00107           for_each(eventConsumerSelectors_.begin(), eventConsumerSelectors_.end(),
00108             boost::bind(&EventConsumerSelector::initialize, _1, imv));
00109         }
00110         catch( stor::exception::InvalidEventSelection& e )
00111         {
00112           sharedResources_->statisticsReporter_->alarmHandler()->
00113             notifySentinel(AlarmHandler::ERROR,e);
00114         }
00115       }
00116       
00117       DataSenderMonitorCollection& dataSenderMonColl = sharedResources_->
00118         statisticsReporter_->getDataSenderMonitorCollection();
00119       dataSenderMonColl.addInitSample(ioc);
00120       
00121       break;
00122     }
00123     
00124     case Header::EVENT:
00125     {
00126       for( EvtSelList::iterator it = eventStreamSelectors_.begin(),
00127              itEnd = eventStreamSelectors_.end();
00128            it != itEnd;
00129            ++it )
00130       {
00131         if( (*it)->acceptEvent( ioc ) )
00132         {
00133           ioc.tagForStream( (*it)->configInfo().streamId() );
00134         }
00135       }
00136       for( ConsSelList::iterator it = eventConsumerSelectors_.begin(),
00137              itEnd = eventConsumerSelectors_.end();
00138            it != itEnd;
00139            ++it )
00140       {
00141         if( (*it)->acceptEvent( ioc ) )
00142         {
00143           ioc.tagForEventConsumer( (*it)->queueId() );
00144         }
00145       }
00146       
00147       RunMonitorCollection& runMonCollection = sharedResources_->
00148         statisticsReporter_->getRunMonitorCollection();
00149       runMonCollection.getRunNumbersSeenMQ().addSampleIfLarger(ioc.runNumber());
00150       runMonCollection.getLumiSectionsSeenMQ().addSampleIfLarger(ioc.lumiSection());
00151       runMonCollection.getEventIDsReceivedMQ().addSample(ioc.eventNumber());
00152       
00153       DataSenderMonitorCollection& dataSenderMonColl = sharedResources_->
00154         statisticsReporter_->getDataSenderMonitorCollection();
00155       dataSenderMonColl.addEventSample(ioc);
00156 
00157       break;
00158     }
00159     
00160     case Header::DQM_EVENT:
00161     {
00162       utils::TimePoint_t now = utils::getCurrentTime();
00163 
00164       for( DQMEvtSelList::iterator it = dqmEventSelectors_.begin(),
00165              itEnd = dqmEventSelectors_.end();
00166            it != itEnd;
00167            ++it)
00168       {
00169         if( (*it)->acceptEvent( ioc, now ) )
00170         {
00171           ioc.tagForDQMEventConsumer( (*it)->queueId() );
00172         }
00173       }
00174       
00175       DataSenderMonitorCollection& dataSenderMonColl = sharedResources_->
00176         statisticsReporter_->getDataSenderMonitorCollection();
00177       dataSenderMonColl.addDQMEventSample(ioc);
00178 
00179       if( ioc.isTaggedForAnyDQMEventConsumer() )
00180       {
00181         sharedResources_->dqmEventQueue_->enqNowait( ioc );
00182       }
00183       else
00184       {
00185         sharedResources_->statisticsReporter_->getDQMEventMonitorCollection().
00186           getDroppedDQMEventCountsMQ().addSample(1);
00187       }
00188       
00189       break;
00190     }
00191     
00192     case Header::ERROR_EVENT:
00193     {
00194       for( ErrSelList::iterator it = errorStreamSelectors_.begin(),
00195              itEnd = errorStreamSelectors_.end();
00196            it != itEnd;
00197            ++it )
00198       {
00199         if( (*it)->acceptEvent( ioc ) )
00200         {
00201           ioc.tagForStream( (*it)->configInfo().streamId() );
00202         }
00203       }
00204       
00205       RunMonitorCollection& runMonCollection = sharedResources_->
00206         statisticsReporter_->getRunMonitorCollection();
00207       runMonCollection.getRunNumbersSeenMQ().addSample(ioc.runNumber());
00208       runMonCollection.getLumiSectionsSeenMQ().addSampleIfLarger(ioc.lumiSection());
00209       runMonCollection.getErrorEventIDsReceivedMQ().addSample(ioc.eventNumber());
00210       
00211       DataSenderMonitorCollection& dataSenderMonColl = sharedResources_->
00212         statisticsReporter_->getDataSenderMonitorCollection();
00213       dataSenderMonColl.addErrorEventSample(ioc);
00214 
00215       break;
00216     }
00217     
00218     default:
00219     {
00220       std::ostringstream msg;
00221       msg << "I2OChain with unknown message type " <<
00222         ioc.messageCode();
00223       XCEPT_DECLARE( stor::exception::WrongI2OMessageType,
00224                      xcept, msg.str());
00225       sharedResources_->statisticsReporter_->
00226         alarmHandler()->notifySentinel(AlarmHandler::ERROR, xcept);
00227 
00228       // 24-Jun-2009, KAB - this is not really the best way to track this,
00229       // but it's probably better than nothing in the short term.
00230       DataSenderMonitorCollection& dataSenderMonColl = sharedResources_->
00231         statisticsReporter_->getDataSenderMonitorCollection();
00232       dataSenderMonColl.addFaultyEventSample(ioc);
00233 
00234       break;
00235     }
00236   }
00237 }
00238 
00239 const bool EventDistributor::full() const
00240 {
00241   return sharedResources_->streamQueue_->full();
00242 }
00243 
00244 
00245 void EventDistributor::registerEventConsumer
00246 (
00247   const EventConsRegPtr regPtr
00248 )
00249 {
00250   ConsSelPtr evtSel( new EventConsumerSelector(regPtr) );
00251 
00252   InitMsgSharedPtr initMsgPtr =
00253     sharedResources_->initMsgCollection_->getElementForOutputModule(
00254       regPtr->outputModuleLabel()
00255     );
00256   if ( initMsgPtr.get() != 0 )
00257   {
00258     uint8* initPtr = &(*initMsgPtr)[0];
00259     InitMsgView initView(initPtr);
00260     try
00261     {
00262       evtSel->initialize( initView );
00263     }
00264     catch( stor::exception::InvalidEventSelection& e )
00265     {
00266       sharedResources_->statisticsReporter_->alarmHandler()->
00267         notifySentinel(AlarmHandler::ERROR, e);
00268     }
00269   }
00270   
00271   eventConsumerSelectors_.insert( evtSel );
00272 }
00273 
00274 void EventDistributor::registerDQMEventConsumer( const DQMEventConsRegPtr ptr )
00275 {
00276   DQMEvtSelPtr dqmEvtSel( new DQMEventSelector(ptr) );
00277   dqmEventSelectors_.insert( dqmEvtSel );
00278 }
00279 
00280 void EventDistributor::registerEventStreams( const EvtStrConfigListPtr cl )
00281 {
00282   for( EvtStrConfigList::const_iterator it = cl->begin(), itEnd = cl->end();
00283        it != itEnd;
00284        ++it )
00285   {
00286     EvtSelPtr evtSel( new EventStreamSelector(*it) );
00287     eventStreamSelectors_.insert( evtSel );
00288   }
00289 }
00290 
00291 
00292 void EventDistributor::registerErrorStreams( const ErrStrConfigListPtr cl )
00293 {
00294   for( ErrStrConfigList::const_iterator it = cl->begin(), itEnd = cl->end();
00295        it != itEnd;
00296        ++it )
00297   {
00298     ErrSelPtr errSel( new ErrorStreamSelector(*it) );
00299     errorStreamSelectors_.insert( errSel );
00300   }
00301 }
00302 
00303 
00304 void EventDistributor::clearStreams()
00305 {
00306   eventStreamSelectors_.clear();
00307   errorStreamSelectors_.clear();
00308 }
00309 
00310 
00311 unsigned int EventDistributor::configuredStreamCount() const
00312 {
00313   return eventStreamSelectors_.size() +
00314     errorStreamSelectors_.size();
00315 }
00316 
00317 
00318 unsigned int EventDistributor::initializedStreamCount() const
00319 {
00320   unsigned int counter = 0;
00321   for (EvtSelList::const_iterator it = eventStreamSelectors_.begin(),
00322          itEnd = eventStreamSelectors_.end();
00323        it != itEnd;
00324        ++it)
00325   {
00326     if ( (*it)->isInitialized() )
00327       ++counter;
00328   }
00329   return counter;
00330 }
00331 
00332 
00333 void EventDistributor::clearConsumers()
00334 {
00335   eventConsumerSelectors_.clear();
00336   dqmEventSelectors_.clear();
00337 }
00338 
00339 
00340 unsigned int EventDistributor::configuredConsumerCount() const
00341 {
00342   return eventConsumerSelectors_.size() + dqmEventSelectors_.size();
00343 }
00344 
00345 
00346 unsigned int EventDistributor::initializedConsumerCount() const
00347 {
00348   unsigned int counter = 0;
00349   for (ConsSelList::const_iterator it = eventConsumerSelectors_.begin(),
00350          itEnd = eventConsumerSelectors_.end();
00351        it != itEnd;
00352        ++it)
00353   {
00354     if ( (*it)->isInitialized() )
00355       ++counter;
00356   }
00357   return counter;
00358 }
00359 
00360 
00361 void EventDistributor::checkForStaleConsumers()
00362 {
00363   utils::TimePoint_t now = utils::getCurrentTime();
00364 
00365   EventQueueCollectionPtr eqc =
00366     sharedResources_->eventQueueCollection_;
00367   eqc->clearStaleQueues(now);
00368 
00369   DQMEventQueueCollectionPtr dqc =
00370     sharedResources_->dqmEventQueueCollection_;
00371   dqc->clearStaleQueues(now);
00372 }
00373 
00374