Go to the documentation of this file.00001
00003
00004 #include "EventFilter/StorageManager/interface/DataSenderMonitorCollection.h"
00005 #include "EventFilter/StorageManager/interface/DQMEventConsumerRegistrationInfo.h"
00006 #include "EventFilter/StorageManager/interface/DQMEventSelector.h"
00007 #include "EventFilter/StorageManager/interface/ErrorStreamConfigurationInfo.h"
00008 #include "EventFilter/StorageManager/interface/ErrorStreamSelector.h"
00009 #include "EventFilter/StorageManager/interface/EventConsumerRegistrationInfo.h"
00010 #include "EventFilter/StorageManager/interface/EventConsumerSelector.h"
00011 #include "EventFilter/StorageManager/interface/EventDistributor.h"
00012 #include "EventFilter/StorageManager/interface/EventStreamConfigurationInfo.h"
00013 #include "EventFilter/StorageManager/interface/EventStreamSelector.h"
00014 #include "EventFilter/StorageManager/interface/I2OChain.h"
00015 #include "EventFilter/StorageManager/interface/InitMsgCollection.h"
00016 #include "EventFilter/StorageManager/interface/QueueID.h"
00017 #include "EventFilter/StorageManager/interface/RegistrationCollection.h"
00018 #include "EventFilter/StorageManager/interface/RunMonitorCollection.h"
00019 #include "EventFilter/StorageManager/interface/StatisticsReporter.h"
00020 #include "EventFilter/StorageManager/interface/Exception.h"
00021
00022 #include "EventFilter/Utilities/interface/i2oEvfMsgs.h"
00023
00024 using namespace stor;
00025
00026
00027 EventDistributor::EventDistributor(SharedResourcesPtr sr):
00028 sharedResources_(sr)
00029 {}
00030
00031
00032 EventDistributor::~EventDistributor()
00033 {
00034 clearStreams();
00035 clearConsumers();
00036 }
00037
00038 void EventDistributor::addEventToRelevantQueues( I2OChain& ioc )
00039 {
00040
00041 if ( ioc.faulty() || !ioc.complete() )
00042 {
00043 std::ostringstream msg;
00044 msg << "Faulty or incomplete I2OChain for event "
00045 << ioc.fragmentKey().event_
00046 << ": 0x" << std::hex << ioc.faultyBits()
00047 << " received from " << ioc.hltURL()
00048 << " (rbBufferId " << ioc.rbBufferId() << ").";
00049 XCEPT_DECLARE( stor::exception::IncompleteEventMessage,
00050 xcept, msg.str());
00051 sharedResources_->statisticsReporter_->alarmHandler()->
00052 notifySentinel(AlarmHandler::ERROR, xcept);
00053
00054 DataSenderMonitorCollection& dataSenderMonColl =
00055 sharedResources_->statisticsReporter_->getDataSenderMonitorCollection();
00056 dataSenderMonColl.addFaultyEventSample(ioc);
00057
00058 if ( !( sharedResources_->configuration_->getDiskWritingParams().faultyEventsStream_.empty() ) &&
00059 ( ioc.i2oMessageCode() == I2O_SM_DATA || ioc.i2oMessageCode() == I2O_SM_ERROR) )
00060 ioc.tagForStream(0);
00061 }
00062 else
00063 {
00064 tagCompleteEventForQueues( ioc );
00065 }
00066
00067
00068 bool unexpected = true;
00069
00070 if( ioc.isTaggedForAnyStream() )
00071 {
00072 unexpected = false;
00073 sharedResources_->streamQueue_->enqWait( ioc );
00074 }
00075
00076 if( ioc.isTaggedForAnyEventConsumer() )
00077 {
00078 unexpected = false;
00079 sharedResources_->eventQueueCollection_->addEvent( ioc );
00080 }
00081
00082 if( unexpected && ioc.messageCode() == Header::EVENT )
00083 {
00084 RunMonitorCollection& runMonColl =
00085 sharedResources_->statisticsReporter_->getRunMonitorCollection();
00086 runMonColl.addUnwantedEvent(ioc);
00087 }
00088 }
00089
00090 void EventDistributor::tagCompleteEventForQueues( I2OChain& ioc )
00091 {
00092 switch( ioc.messageCode() )
00093 {
00094
00095 case Header::INIT:
00096 {
00097 std::vector<unsigned char> b;
00098 ioc.copyFragmentsIntoBuffer(b);
00099 InitMsgView imv( &b[0] );
00100 if( sharedResources_->initMsgCollection_->addIfUnique( imv ) )
00101 {
00102 try
00103 {
00104 for_each(eventStreamSelectors_.begin(),eventStreamSelectors_.end(),
00105 boost::bind(&EventStreamSelector::initialize, _1, imv));
00106
00107 for_each(eventConsumerSelectors_.begin(), eventConsumerSelectors_.end(),
00108 boost::bind(&EventConsumerSelector::initialize, _1, imv));
00109 }
00110 catch( stor::exception::InvalidEventSelection& e )
00111 {
00112 sharedResources_->statisticsReporter_->alarmHandler()->
00113 notifySentinel(AlarmHandler::ERROR,e);
00114 }
00115 }
00116
00117 DataSenderMonitorCollection& dataSenderMonColl = sharedResources_->
00118 statisticsReporter_->getDataSenderMonitorCollection();
00119 dataSenderMonColl.addInitSample(ioc);
00120
00121 break;
00122 }
00123
00124 case Header::EVENT:
00125 {
00126 for( EvtSelList::iterator it = eventStreamSelectors_.begin(),
00127 itEnd = eventStreamSelectors_.end();
00128 it != itEnd;
00129 ++it )
00130 {
00131 if( (*it)->acceptEvent( ioc ) )
00132 {
00133 ioc.tagForStream( (*it)->configInfo().streamId() );
00134 }
00135 }
00136 for( ConsSelList::iterator it = eventConsumerSelectors_.begin(),
00137 itEnd = eventConsumerSelectors_.end();
00138 it != itEnd;
00139 ++it )
00140 {
00141 if( (*it)->acceptEvent( ioc ) )
00142 {
00143 ioc.tagForEventConsumer( (*it)->queueId() );
00144 }
00145 }
00146
00147 RunMonitorCollection& runMonCollection = sharedResources_->
00148 statisticsReporter_->getRunMonitorCollection();
00149 runMonCollection.getRunNumbersSeenMQ().addSampleIfLarger(ioc.runNumber());
00150 runMonCollection.getLumiSectionsSeenMQ().addSampleIfLarger(ioc.lumiSection());
00151 runMonCollection.getEventIDsReceivedMQ().addSample(ioc.eventNumber());
00152
00153 DataSenderMonitorCollection& dataSenderMonColl = sharedResources_->
00154 statisticsReporter_->getDataSenderMonitorCollection();
00155 dataSenderMonColl.addEventSample(ioc);
00156
00157 break;
00158 }
00159
00160 case Header::DQM_EVENT:
00161 {
00162 utils::TimePoint_t now = utils::getCurrentTime();
00163
00164 for( DQMEvtSelList::iterator it = dqmEventSelectors_.begin(),
00165 itEnd = dqmEventSelectors_.end();
00166 it != itEnd;
00167 ++it)
00168 {
00169 if( (*it)->acceptEvent( ioc, now ) )
00170 {
00171 ioc.tagForDQMEventConsumer( (*it)->queueId() );
00172 }
00173 }
00174
00175 DataSenderMonitorCollection& dataSenderMonColl = sharedResources_->
00176 statisticsReporter_->getDataSenderMonitorCollection();
00177 dataSenderMonColl.addDQMEventSample(ioc);
00178
00179 if( ioc.isTaggedForAnyDQMEventConsumer() )
00180 {
00181 sharedResources_->dqmEventQueue_->enqNowait( ioc );
00182 }
00183 else
00184 {
00185 sharedResources_->statisticsReporter_->getDQMEventMonitorCollection().
00186 getDroppedDQMEventCountsMQ().addSample(1);
00187 }
00188
00189 break;
00190 }
00191
00192 case Header::ERROR_EVENT:
00193 {
00194 for( ErrSelList::iterator it = errorStreamSelectors_.begin(),
00195 itEnd = errorStreamSelectors_.end();
00196 it != itEnd;
00197 ++it )
00198 {
00199 if( (*it)->acceptEvent( ioc ) )
00200 {
00201 ioc.tagForStream( (*it)->configInfo().streamId() );
00202 }
00203 }
00204
00205 RunMonitorCollection& runMonCollection = sharedResources_->
00206 statisticsReporter_->getRunMonitorCollection();
00207 runMonCollection.getRunNumbersSeenMQ().addSample(ioc.runNumber());
00208 runMonCollection.getLumiSectionsSeenMQ().addSampleIfLarger(ioc.lumiSection());
00209 runMonCollection.getErrorEventIDsReceivedMQ().addSample(ioc.eventNumber());
00210
00211 DataSenderMonitorCollection& dataSenderMonColl = sharedResources_->
00212 statisticsReporter_->getDataSenderMonitorCollection();
00213 dataSenderMonColl.addErrorEventSample(ioc);
00214
00215 break;
00216 }
00217
00218 default:
00219 {
00220 std::ostringstream msg;
00221 msg << "I2OChain with unknown message type " <<
00222 ioc.messageCode();
00223 XCEPT_DECLARE( stor::exception::WrongI2OMessageType,
00224 xcept, msg.str());
00225 sharedResources_->statisticsReporter_->
00226 alarmHandler()->notifySentinel(AlarmHandler::ERROR, xcept);
00227
00228
00229
00230 DataSenderMonitorCollection& dataSenderMonColl = sharedResources_->
00231 statisticsReporter_->getDataSenderMonitorCollection();
00232 dataSenderMonColl.addFaultyEventSample(ioc);
00233
00234 break;
00235 }
00236 }
00237 }
00238
00239 const bool EventDistributor::full() const
00240 {
00241 return sharedResources_->streamQueue_->full();
00242 }
00243
00244
00245 void EventDistributor::registerEventConsumer
00246 (
00247 const EventConsRegPtr regPtr
00248 )
00249 {
00250 ConsSelPtr evtSel( new EventConsumerSelector(regPtr) );
00251
00252 InitMsgSharedPtr initMsgPtr =
00253 sharedResources_->initMsgCollection_->getElementForOutputModule(
00254 regPtr->outputModuleLabel()
00255 );
00256 if ( initMsgPtr.get() != 0 )
00257 {
00258 uint8* initPtr = &(*initMsgPtr)[0];
00259 InitMsgView initView(initPtr);
00260 try
00261 {
00262 evtSel->initialize( initView );
00263 }
00264 catch( stor::exception::InvalidEventSelection& e )
00265 {
00266 sharedResources_->statisticsReporter_->alarmHandler()->
00267 notifySentinel(AlarmHandler::ERROR, e);
00268 }
00269 }
00270
00271 eventConsumerSelectors_.insert( evtSel );
00272 }
00273
00274 void EventDistributor::registerDQMEventConsumer( const DQMEventConsRegPtr ptr )
00275 {
00276 DQMEvtSelPtr dqmEvtSel( new DQMEventSelector(ptr) );
00277 dqmEventSelectors_.insert( dqmEvtSel );
00278 }
00279
00280 void EventDistributor::registerEventStreams( const EvtStrConfigListPtr cl )
00281 {
00282 for( EvtStrConfigList::const_iterator it = cl->begin(), itEnd = cl->end();
00283 it != itEnd;
00284 ++it )
00285 {
00286 EvtSelPtr evtSel( new EventStreamSelector(*it) );
00287 eventStreamSelectors_.insert( evtSel );
00288 }
00289 }
00290
00291
00292 void EventDistributor::registerErrorStreams( const ErrStrConfigListPtr cl )
00293 {
00294 for( ErrStrConfigList::const_iterator it = cl->begin(), itEnd = cl->end();
00295 it != itEnd;
00296 ++it )
00297 {
00298 ErrSelPtr errSel( new ErrorStreamSelector(*it) );
00299 errorStreamSelectors_.insert( errSel );
00300 }
00301 }
00302
00303
00304 void EventDistributor::clearStreams()
00305 {
00306 eventStreamSelectors_.clear();
00307 errorStreamSelectors_.clear();
00308 }
00309
00310
00311 unsigned int EventDistributor::configuredStreamCount() const
00312 {
00313 return eventStreamSelectors_.size() +
00314 errorStreamSelectors_.size();
00315 }
00316
00317
00318 unsigned int EventDistributor::initializedStreamCount() const
00319 {
00320 unsigned int counter = 0;
00321 for (EvtSelList::const_iterator it = eventStreamSelectors_.begin(),
00322 itEnd = eventStreamSelectors_.end();
00323 it != itEnd;
00324 ++it)
00325 {
00326 if ( (*it)->isInitialized() )
00327 ++counter;
00328 }
00329 return counter;
00330 }
00331
00332
00333 void EventDistributor::clearConsumers()
00334 {
00335 eventConsumerSelectors_.clear();
00336 dqmEventSelectors_.clear();
00337 }
00338
00339
00340 unsigned int EventDistributor::configuredConsumerCount() const
00341 {
00342 return eventConsumerSelectors_.size() + dqmEventSelectors_.size();
00343 }
00344
00345
00346 unsigned int EventDistributor::initializedConsumerCount() const
00347 {
00348 unsigned int counter = 0;
00349 for (ConsSelList::const_iterator it = eventConsumerSelectors_.begin(),
00350 itEnd = eventConsumerSelectors_.end();
00351 it != itEnd;
00352 ++it)
00353 {
00354 if ( (*it)->isInitialized() )
00355 ++counter;
00356 }
00357 return counter;
00358 }
00359
00360
00361 void EventDistributor::checkForStaleConsumers()
00362 {
00363 utils::TimePoint_t now = utils::getCurrentTime();
00364
00365 EventQueueCollectionPtr eqc =
00366 sharedResources_->eventQueueCollection_;
00367 eqc->clearStaleQueues(now);
00368
00369 DQMEventQueueCollectionPtr dqc =
00370 sharedResources_->dqmEventQueueCollection_;
00371 dqc->clearStaleQueues(now);
00372 }
00373
00374