Go to the documentation of this file.00001
00003
00004 #include "toolbox/task/WorkLoopFactory.h"
00005 #include "xcept/tools.h"
00006
00007 #include "EventFilter/StorageManager/interface/Exception.h"
00008 #include "EventFilter/StorageManager/interface/DQMEventProcessor.h"
00009 #include "EventFilter/StorageManager/interface/DQMEventProcessorResources.h"
00010 #include "EventFilter/StorageManager/interface/DQMEventQueueCollection.h"
00011 #include "EventFilter/StorageManager/interface/QueueID.h"
00012 #include "EventFilter/StorageManager/src/DQMEventStore.icc"
00013
00014
00015 namespace stor {
00016
00018
00020
00021 template<>
00022 DQMEventMsgView
00023 DQMEventStore<I2OChain,DataSenderMonitorCollection,AlarmHandler>::
00024 getDQMEventView(I2OChain const& dqmEvent)
00025 {
00026 tempEventArea_.clear();
00027 dqmEvent.copyFragmentsIntoBuffer(tempEventArea_);
00028 return DQMEventMsgView(&tempEventArea_[0]);
00029 }
00030
00031
00032
00033 DQMEventProcessor::DQMEventProcessor(xdaq::Application* app, SharedResourcesPtr sr) :
00034 app_(app),
00035 sharedResources_(sr),
00036 actionIsActive_(true),
00037 latestLumiSection_(0),
00038 discardDQMUpdatesForOlderLS_(0),
00039 dqmEventStore_
00040 (
00041 app->getApplicationDescriptor(),
00042 sr->dqmEventQueueCollection_,
00043 sr->statisticsReporter_->getDQMEventMonitorCollection(),
00044 &sr->statisticsReporter_->getDataSenderMonitorCollection(),
00045 &stor::DataSenderMonitorCollection::getConnectedEPs,
00046 sr->alarmHandler_.get(),
00047 &stor::AlarmHandler::moveToFailedState,
00048 sr->alarmHandler_
00049 )
00050 {
00051 WorkerThreadParams workerParams =
00052 sharedResources_->configuration_->getWorkerThreadParams();
00053 timeout_ = workerParams.DQMEPdeqWaitTime_;
00054 }
00055
00056
00057 DQMEventProcessor::~DQMEventProcessor()
00058 {
00059
00060 actionIsActive_ = false;
00061
00062
00063 processWL_->cancel();
00064 }
00065
00066
00067 void DQMEventProcessor::startWorkLoop(std::string workloopName)
00068 {
00069 try
00070 {
00071 std::string identifier = utils::getIdentifier(app_->getApplicationDescriptor());
00072
00073 processWL_ = toolbox::task::getWorkLoopFactory()->
00074 getWorkLoop( identifier + workloopName, "waiting" );
00075
00076 if ( ! processWL_->isActive() )
00077 {
00078 toolbox::task::ActionSignature* processAction =
00079 toolbox::task::bind(this, &DQMEventProcessor::processDQMEvents,
00080 identifier + "ProcessNextDQMEvent");
00081 processWL_->submit(processAction);
00082
00083 processWL_->activate();
00084 }
00085 }
00086 catch (xcept::Exception& e)
00087 {
00088 std::string msg = "Failed to start workloop 'DQMEventProcessor' with 'processNextDQMEvent'.";
00089 XCEPT_RETHROW(stor::exception::DQMEventProcessing, msg, e);
00090 }
00091 }
00092
00093
00094 bool DQMEventProcessor::processDQMEvents(toolbox::task::WorkLoop*)
00095 {
00096 std::string errorMsg = "Failed to process a DQM event: ";
00097
00098 try
00099 {
00100 processNextDQMEvent();
00101 }
00102 catch(xcept::Exception &e)
00103 {
00104 XCEPT_DECLARE_NESTED( stor::exception::DQMEventProcessing,
00105 sentinelException, errorMsg, e );
00106 sharedResources_->alarmHandler_->moveToFailedState(sentinelException);
00107 }
00108 catch(std::exception &e)
00109 {
00110 errorMsg += e.what();
00111 XCEPT_DECLARE( stor::exception::DQMEventProcessing,
00112 sentinelException, errorMsg );
00113 sharedResources_->alarmHandler_->moveToFailedState(sentinelException);
00114 }
00115 catch(...)
00116 {
00117 errorMsg += "Unknown exception";
00118 XCEPT_DECLARE( stor::exception::DQMEventProcessing,
00119 sentinelException, errorMsg );
00120 sharedResources_->alarmHandler_->moveToFailedState(sentinelException);
00121 }
00122
00123 return actionIsActive_;
00124 }
00125
00126
00127 void DQMEventProcessor::processNextDQMEvent()
00128 {
00129 DQMEventQueue::ValueType dqmEvent;
00130 DQMEventQueuePtr eq = sharedResources_->dqmEventQueue_;
00131 utils::TimePoint_t startTime = utils::getCurrentTime();
00132 if (eq->deqTimedWait(dqmEvent, timeout_))
00133 {
00134 utils::Duration_t elapsedTime = utils::getCurrentTime() - startTime;
00135 sharedResources_->statisticsReporter_->getThroughputMonitorCollection().
00136 addDQMEventProcessorIdleSample(elapsedTime);
00137
00138 if (
00139 (discardDQMUpdatesForOlderLS_ > 0) &&
00140 (dqmEvent.first.lumiSection() + discardDQMUpdatesForOlderLS_ < latestLumiSection_)
00141 )
00142
00143 {
00144
00145 sharedResources_->statisticsReporter_->getDQMEventMonitorCollection().
00146 getDroppedDQMEventCountsMQ().addSample(dqmEvent.second + 1);
00147 }
00148 else
00149 {
00150 sharedResources_->statisticsReporter_->getThroughputMonitorCollection().
00151 addPoppedDQMEventSample(dqmEvent.first.memoryUsed());
00152 sharedResources_->statisticsReporter_->getDQMEventMonitorCollection().
00153 getDroppedDQMEventCountsMQ().addSample(dqmEvent.second);
00154
00155 latestLumiSection_ = std::max(latestLumiSection_, dqmEvent.first.lumiSection());
00156 dqmEventStore_.addDQMEvent(dqmEvent.first);
00157 }
00158 }
00159 else
00160 {
00161 utils::Duration_t elapsedTime = utils::getCurrentTime() - startTime;
00162 sharedResources_->statisticsReporter_->getThroughputMonitorCollection().
00163 addDQMEventProcessorIdleSample(elapsedTime);
00164 }
00165
00166 DQMEventProcessorResources::Requests requests;
00167 DQMProcessingParams dqmParams;
00168 boost::posix_time::time_duration newTimeoutValue;
00169 if (sharedResources_->dqmEventProcessorResources_->
00170 getRequests(requests, dqmParams, newTimeoutValue))
00171 {
00172 if (requests.configuration)
00173 {
00174 timeout_ = newTimeoutValue;
00175 dqmEventStore_.setParameters(dqmParams);
00176 discardDQMUpdatesForOlderLS_ = dqmParams.discardDQMUpdatesForOlderLS_;
00177 }
00178 if (requests.endOfRun)
00179 {
00180 endOfRun();
00181 }
00182 if (requests.storeDestruction)
00183 {
00184 dqmEventStore_.clear();
00185 }
00186 sharedResources_->dqmEventProcessorResources_->requestsDone();
00187 }
00188 }
00189
00190 void DQMEventProcessor::endOfRun()
00191 {
00192 dqmEventStore_.purge();
00193 latestLumiSection_ = 0;
00194 }
00195
00196 }
00197