Go to the documentation of this file.00001
00003
00004 #include "toolbox/task/WorkLoopFactory.h"
00005 #include "xcept/tools.h"
00006
00007 #include "EventFilter/StorageManager/interface/Exception.h"
00008 #include "EventFilter/StorageManager/interface/DQMEventProcessor.h"
00009 #include "EventFilter/StorageManager/interface/DQMEventProcessorResources.h"
00010 #include "EventFilter/StorageManager/interface/DQMEventQueueCollection.h"
00011 #include "EventFilter/StorageManager/interface/QueueID.h"
00012 #include "EventFilter/StorageManager/interface/StatisticsReporter.h"
00013 #include "EventFilter/StorageManager/src/DQMEventStore.icc"
00014
00015
00016 namespace stor {
00017
00019
00021
00022 template<>
00023 DQMEventMsgView
00024 DQMEventStore<I2OChain,InitMsgCollection,SharedResources>::
00025 getDQMEventView(I2OChain const& dqmEvent)
00026 {
00027 tempEventArea_.clear();
00028 dqmEvent.copyFragmentsIntoBuffer(tempEventArea_);
00029 return DQMEventMsgView(&tempEventArea_[0]);
00030 }
00031
00032
00033
00034 DQMEventProcessor::DQMEventProcessor(xdaq::Application* app, SharedResourcesPtr sr) :
00035 app_(app),
00036 sharedResources_(sr),
00037 actionIsActive_(true),
00038 latestLumiSection_(0),
00039 discardDQMUpdatesForOlderLS_(0),
00040 dqmEventStore_
00041 (
00042 app->getApplicationDescriptor(),
00043 sr->dqmEventQueueCollection_,
00044 sr->statisticsReporter_->getDQMEventMonitorCollection(),
00045 sr->initMsgCollection_.get(),
00046 &stor::InitMsgCollection::maxMsgCount,
00047 sr.get(),
00048 &stor::SharedResources::moveToFailedState,
00049 sr->statisticsReporter_->alarmHandler()
00050 )
00051 {
00052 WorkerThreadParams workerParams =
00053 sharedResources_->configuration_->getWorkerThreadParams();
00054 timeout_ = workerParams.DQMEPdeqWaitTime_;
00055 }
00056
00057
00058 DQMEventProcessor::~DQMEventProcessor()
00059 {
00060
00061 actionIsActive_ = false;
00062
00063
00064 processWL_->cancel();
00065 }
00066
00067
00068 void DQMEventProcessor::startWorkLoop(std::string workloopName)
00069 {
00070 try
00071 {
00072 std::string identifier = utils::getIdentifier(app_->getApplicationDescriptor());
00073
00074 processWL_ = toolbox::task::getWorkLoopFactory()->
00075 getWorkLoop( identifier + workloopName, "waiting" );
00076
00077 if ( ! processWL_->isActive() )
00078 {
00079 toolbox::task::ActionSignature* processAction =
00080 toolbox::task::bind(this, &DQMEventProcessor::processDQMEvents,
00081 identifier + "ProcessNextDQMEvent");
00082 processWL_->submit(processAction);
00083
00084 processWL_->activate();
00085 }
00086 }
00087 catch (xcept::Exception& e)
00088 {
00089 std::string msg = "Failed to start workloop 'DQMEventProcessor' with 'processNextDQMEvent'.";
00090 XCEPT_RETHROW(stor::exception::DQMEventProcessing, msg, e);
00091 }
00092 }
00093
00094
00095 bool DQMEventProcessor::processDQMEvents(toolbox::task::WorkLoop*)
00096 {
00097 std::string errorMsg = "Failed to process a DQM event: ";
00098
00099 try
00100 {
00101 processNextDQMEvent();
00102 }
00103 catch(xcept::Exception &e)
00104 {
00105 XCEPT_DECLARE_NESTED( stor::exception::DQMEventProcessing,
00106 sentinelException, errorMsg, e );
00107 sharedResources_->moveToFailedState(sentinelException);
00108 }
00109 catch(std::exception &e)
00110 {
00111 errorMsg += e.what();
00112 XCEPT_DECLARE( stor::exception::DQMEventProcessing,
00113 sentinelException, errorMsg );
00114 sharedResources_->moveToFailedState(sentinelException);
00115 }
00116 catch(...)
00117 {
00118 errorMsg += "Unknown exception";
00119 XCEPT_DECLARE( stor::exception::DQMEventProcessing,
00120 sentinelException, errorMsg );
00121 sharedResources_->moveToFailedState(sentinelException);
00122 }
00123
00124 return actionIsActive_;
00125 }
00126
00127
00128 void DQMEventProcessor::processNextDQMEvent()
00129 {
00130 DQMEventQueue::ValueType dqmEvent;
00131 DQMEventQueuePtr eq = sharedResources_->dqmEventQueue_;
00132 utils::TimePoint_t startTime = utils::getCurrentTime();
00133 if (eq->deqTimedWait(dqmEvent, timeout_))
00134 {
00135 utils::Duration_t elapsedTime = utils::getCurrentTime() - startTime;
00136 sharedResources_->statisticsReporter_->getThroughputMonitorCollection().
00137 addDQMEventProcessorIdleSample(elapsedTime);
00138
00139 if (
00140 (discardDQMUpdatesForOlderLS_ > 0) &&
00141 (dqmEvent.first.lumiSection() + discardDQMUpdatesForOlderLS_ < latestLumiSection_)
00142 )
00143
00144 {
00145
00146 sharedResources_->statisticsReporter_->getDQMEventMonitorCollection().
00147 getDroppedDQMEventCountsMQ().addSample(dqmEvent.second + 1);
00148 }
00149 else
00150 {
00151 sharedResources_->statisticsReporter_->getThroughputMonitorCollection().
00152 addPoppedDQMEventSample(dqmEvent.first.memoryUsed());
00153 sharedResources_->statisticsReporter_->getDQMEventMonitorCollection().
00154 getDroppedDQMEventCountsMQ().addSample(dqmEvent.second);
00155
00156 latestLumiSection_ = std::max(latestLumiSection_, dqmEvent.first.lumiSection());
00157 dqmEventStore_.addDQMEvent(dqmEvent.first);
00158 }
00159 }
00160 else
00161 {
00162 utils::Duration_t elapsedTime = utils::getCurrentTime() - startTime;
00163 sharedResources_->statisticsReporter_->getThroughputMonitorCollection().
00164 addDQMEventProcessorIdleSample(elapsedTime);
00165 }
00166
00167 DQMEventProcessorResources::Requests requests;
00168 DQMProcessingParams dqmParams;
00169 boost::posix_time::time_duration newTimeoutValue;
00170 if (sharedResources_->dqmEventProcessorResources_->
00171 getRequests(requests, dqmParams, newTimeoutValue))
00172 {
00173 if (requests.configuration)
00174 {
00175 timeout_ = newTimeoutValue;
00176 dqmEventStore_.setParameters(dqmParams);
00177 discardDQMUpdatesForOlderLS_ = dqmParams.discardDQMUpdatesForOlderLS_;
00178 }
00179 if (requests.endOfRun)
00180 {
00181 endOfRun();
00182 }
00183 if (requests.storeDestruction)
00184 {
00185 dqmEventStore_.clear();
00186 }
00187 sharedResources_->dqmEventProcessorResources_->requestsDone();
00188 }
00189 }
00190
00191 void DQMEventProcessor::endOfRun()
00192 {
00193 dqmEventStore_.purge();
00194 }
00195
00196 }
00197